Compare commits
22 Commits
feature/OR
...
feature/pl
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
d305521067 | ||
|
|
30d6dd0557 | ||
| 12e2691a24 | |||
|
|
c431a3d055 | ||
|
|
1d978caea7 | ||
| be27f506e3 | |||
|
|
8f11971bfc | ||
|
|
0653c2437f | ||
|
|
48b7707eb3 | ||
| 2fdc6856ba | |||
|
|
4ac449ff63 | ||
|
|
e6a7c6de8d | ||
|
|
0b924208dc | ||
| 2f0fd24670 | |||
|
|
6abdc220d2 | ||
|
|
51401a3ba9 | ||
|
|
0befc49b1e | ||
| fd554c8a5a | |||
|
|
c167c6930d | ||
|
|
49ecb48eb0 | ||
|
|
237732bc64 | ||
| 4e52e192e4 |
@@ -1,8 +1,10 @@
|
||||
import subprocess
|
||||
import os
|
||||
import json
|
||||
import logging
|
||||
import threading
|
||||
import signal
|
||||
import time
|
||||
from ..config import settings
|
||||
from ..db import get_db, get_task_by_repo_branch, update_task_stage, enqueue_job
|
||||
from ..stages import get_next_stage, get_qg_for_stage, get_agent_for_stage
|
||||
@@ -14,6 +16,62 @@ from ..plane_sync import notify_stage_change as plane_notify_stage, add_comment
|
||||
logger = logging.getLogger("orchestrator.launcher")
|
||||
|
||||
|
||||
def prune_run_logs(runs_dir, keep_days=30, keep_max=500, active_paths=None):
|
||||
"""L-2: best-effort rotation of per-run logs (<runs_dir>/*.log).
|
||||
|
||||
A log file is removed if it is older than keep_days OR it is not within the
|
||||
keep_max most-recent logs (whichever condition is met first). Only *.log
|
||||
files directly inside runs_dir are considered; non-.log files and
|
||||
subdirectories are never touched. Files whose path is in active_paths (the
|
||||
currently running log) are always kept.
|
||||
|
||||
Returns the number of files removed. Never raises: any error is logged and
|
||||
swallowed so log rotation can never bring the app down.
|
||||
"""
|
||||
removed = 0
|
||||
try:
|
||||
active = set()
|
||||
for ap in (active_paths or []):
|
||||
try:
|
||||
active.add(os.path.realpath(ap))
|
||||
except Exception:
|
||||
active.add(ap)
|
||||
|
||||
if not os.path.isdir(runs_dir):
|
||||
return 0
|
||||
|
||||
logs = []
|
||||
for name in os.listdir(runs_dir):
|
||||
if not name.endswith(".log"):
|
||||
continue
|
||||
path = os.path.join(runs_dir, name)
|
||||
if not os.path.isfile(path):
|
||||
continue
|
||||
if os.path.realpath(path) in active:
|
||||
continue
|
||||
try:
|
||||
mtime = os.path.getmtime(path)
|
||||
except OSError:
|
||||
continue
|
||||
logs.append((path, mtime))
|
||||
|
||||
logs.sort(key=lambda t: t[1], reverse=True)
|
||||
|
||||
cutoff = time.time() - keep_days * 86400
|
||||
for idx, (path, mtime) in enumerate(logs):
|
||||
too_old = mtime < cutoff
|
||||
over_max = idx >= keep_max
|
||||
if too_old or over_max:
|
||||
try:
|
||||
os.remove(path)
|
||||
removed += 1
|
||||
except OSError as e:
|
||||
logger.warning(f"prune_run_logs: failed to remove {path}: {e}")
|
||||
except Exception as e:
|
||||
logger.warning(f"prune_run_logs failed for {runs_dir}: {e}")
|
||||
return removed
|
||||
|
||||
|
||||
class AgentLauncher:
|
||||
"""Launch Claude CLI agents directly (binary mounted into container)."""
|
||||
|
||||
@@ -53,7 +111,10 @@ class AgentLauncher:
|
||||
}
|
||||
|
||||
CLAUDE_BIN = "/opt/claude-code/bin/claude.exe"
|
||||
AGENT_TIMEOUT = 1800 # 30 minutes
|
||||
# ORCH-7 (M-2): timeout is now configurable. AGENT_TIMEOUT stays as a
|
||||
# backward-compatible alias for the default; the actual value (and per-agent
|
||||
# overrides) live in settings and are resolved via _resolve_timeout().
|
||||
AGENT_TIMEOUT = settings.agent_timeout_seconds
|
||||
|
||||
def launch(self, agent: str, repo: str, task_content: str = None, task_id: int = None) -> int:
|
||||
"""
|
||||
@@ -190,7 +251,7 @@ class AgentLauncher:
|
||||
t = threading.Thread(
|
||||
target=self._watchdog,
|
||||
args=(proc.pid, run_id),
|
||||
kwargs={"job_id": job_id},
|
||||
kwargs={"job_id": job_id, "agent": agent},
|
||||
daemon=True,
|
||||
)
|
||||
t.start()
|
||||
@@ -209,29 +270,100 @@ class AgentLauncher:
|
||||
notify_agent_started(run_id, agent, task_id)
|
||||
return run_id
|
||||
|
||||
def _watchdog(self, pid: int, run_id: int, timeout: int = None, job_id: int = None):
|
||||
"""Kill agent if it exceeds timeout.
|
||||
@staticmethod
|
||||
def _resolve_timeout(agent: str = None) -> int:
|
||||
"""ORCH-7 (M-2): resolve the wall-clock timeout for an agent.
|
||||
|
||||
Per-agent override from settings.agent_timeout_overrides_json (a JSON object
|
||||
like {"reviewer": 3600}) wins; otherwise the global default
|
||||
settings.agent_timeout_seconds is used. A malformed override JSON is ignored
|
||||
(falls back to the default) and only logged, so a bad env never bricks runs.
|
||||
"""
|
||||
default = settings.agent_timeout_seconds
|
||||
raw = (settings.agent_timeout_overrides_json or "").strip()
|
||||
if agent and raw:
|
||||
try:
|
||||
overrides = json.loads(raw)
|
||||
if isinstance(overrides, dict) and agent in overrides:
|
||||
return int(overrides[agent])
|
||||
except (ValueError, TypeError) as e:
|
||||
logger.warning(f"Invalid agent_timeout_overrides_json, using default: {e}")
|
||||
return default
|
||||
|
||||
def _watchdog(self, pid: int, run_id: int, timeout: int = None,
|
||||
job_id: int = None, agent: str = None):
|
||||
"""Kill agent if it exceeds its timeout.
|
||||
|
||||
ORCH-1: on a timeout-kill the monitor's proc.wait() returns the kill exit
|
||||
code and drives the job retry/fail logic, so the watchdog itself only needs
|
||||
to SIGKILL and record the agent_runs exit. job_id is accepted for symmetry.
|
||||
to terminate the process and record the agent_runs exit. job_id is accepted
|
||||
for symmetry.
|
||||
|
||||
ORCH-7 (M-2): graceful shutdown. Instead of an immediate SIGKILL (which cuts
|
||||
claude off mid-write and leaves half-written artifacts), send SIGTERM first,
|
||||
give the process up to settings.agent_kill_grace_seconds to flush and exit on
|
||||
its own, and only SIGKILL if it is still alive after the grace window. If the
|
||||
process exits during the grace window, SIGKILL is NOT sent.
|
||||
ProcessLookupError is tolerated at every step (the process may already be
|
||||
gone). The recorded exit_code stays -9 to match the existing retry/fail
|
||||
contract regardless of which signal actually reaped it.
|
||||
"""
|
||||
import time
|
||||
if timeout is None:
|
||||
timeout = self.AGENT_TIMEOUT
|
||||
timeout = self._resolve_timeout(agent)
|
||||
time.sleep(timeout)
|
||||
|
||||
# Phase 1: SIGTERM (graceful). If the process is already gone, we're done.
|
||||
try:
|
||||
os.kill(pid, signal.SIGTERM)
|
||||
logger.warning(
|
||||
f"Agent run_id={run_id} exceeded {timeout}s timeout: sent SIGTERM "
|
||||
f"(pid={pid}), grace={settings.agent_kill_grace_seconds}s"
|
||||
)
|
||||
except ProcessLookupError:
|
||||
logger.info(f"Agent run_id={run_id} already exited before SIGTERM")
|
||||
return # nothing to record: the monitor's proc.wait() owns the exit
|
||||
|
||||
# Phase 2: poll for graceful exit within the grace window.
|
||||
grace = settings.agent_kill_grace_seconds
|
||||
poll_interval = 0.5
|
||||
waited = 0.0
|
||||
while waited < grace:
|
||||
time.sleep(poll_interval)
|
||||
waited += poll_interval
|
||||
try:
|
||||
os.kill(pid, 0) # signal 0 = liveness probe, does not kill
|
||||
except ProcessLookupError:
|
||||
logger.info(
|
||||
f"Agent run_id={run_id} exited gracefully after SIGTERM "
|
||||
f"({waited:.1f}s); no SIGKILL needed"
|
||||
)
|
||||
self._record_kill(run_id)
|
||||
return
|
||||
|
||||
# Phase 3: still alive -> hard SIGKILL.
|
||||
try:
|
||||
os.kill(pid, signal.SIGKILL)
|
||||
logger.warning(f"Agent run_id={run_id} killed after {timeout}s timeout")
|
||||
conn = get_db()
|
||||
conn.execute(
|
||||
"UPDATE agent_runs SET finished_at=datetime('now'), exit_code=-9 WHERE id=?",
|
||||
(run_id,),
|
||||
logger.warning(
|
||||
f"Agent run_id={run_id} did not exit within {grace}s grace: sent SIGKILL"
|
||||
)
|
||||
conn.commit()
|
||||
conn.close()
|
||||
except ProcessLookupError:
|
||||
pass # Already finished
|
||||
logger.info(f"Agent run_id={run_id} exited just before SIGKILL")
|
||||
self._record_kill(run_id)
|
||||
|
||||
@staticmethod
|
||||
def _record_kill(run_id: int):
|
||||
"""Stamp the agent_runs row as timeout-killed (exit_code=-9).
|
||||
|
||||
ORCH-1: -9 is the existing kill-exit contract the monitor/retry logic keys
|
||||
off, so we keep it stable whether the reap came from SIGTERM or SIGKILL.
|
||||
"""
|
||||
conn = get_db()
|
||||
conn.execute(
|
||||
"UPDATE agent_runs SET finished_at=datetime('now'), exit_code=-9 WHERE id=?",
|
||||
(run_id,),
|
||||
)
|
||||
conn.commit()
|
||||
conn.close()
|
||||
|
||||
def _monitor_agent(self, proc, run_id, agent, repo, branch, output_path=None, log_fh=None, job_id=None):
|
||||
"""Wait for agent to finish, commit+push results, update DB.
|
||||
@@ -339,7 +471,8 @@ class AgentLauncher:
|
||||
set_issue_blocked(_wid)
|
||||
plane_add_comment(
|
||||
_wid,
|
||||
"\u274c Deploy FAILED (smoke/healthcheck). Rolled back. Developer \u043d\u0443\u0436\u0435\u043d \u0434\u043b\u044f \u0444\u0438\u043a\u0441\u0430."
|
||||
"\u274c Deploy FAILED (smoke/healthcheck). Rolled back. Developer \u043d\u0443\u0436\u0435\u043d \u0434\u043b\u044f \u0444\u0438\u043a\u0441\u0430.",
|
||||
author="deployer",
|
||||
)
|
||||
from ..notifications import send_telegram
|
||||
send_telegram(f"\U0001f6a8 {_wid}: Deploy failed! Rolled back. Needs fix.")
|
||||
@@ -488,7 +621,15 @@ class AgentLauncher:
|
||||
pass
|
||||
|
||||
def _try_advance_stage(self, run_id: int, agent: str, repo: str, branch: str):
|
||||
"""After agent finishes successfully, check QG and advance stage if possible."""
|
||||
"""After agent finishes successfully, advance the stage via the unified engine.
|
||||
|
||||
ORCH-4 / M-3: the 174-line body that used to live here moved into
|
||||
src/stage_engine.advance_stage(). This is now a thin wrapper: it looks up
|
||||
the task by (repo, branch) and delegates. `agent` is forwarded as
|
||||
finished_agent so the analyst/reviewer/tester/architect rollback branches
|
||||
still trigger exactly as before. The agent-selection bug (it used to call
|
||||
get_agent_for_stage(next_stage)) is fixed inside the engine.
|
||||
"""
|
||||
try:
|
||||
conn = get_db()
|
||||
task_row = conn.execute(
|
||||
@@ -500,174 +641,15 @@ class AgentLauncher:
|
||||
return
|
||||
|
||||
task_id, current_stage, work_item_id = task_row
|
||||
qg_name = get_qg_for_stage(current_stage)
|
||||
next_stage = get_next_stage(current_stage)
|
||||
|
||||
if not next_stage:
|
||||
return
|
||||
|
||||
# Run QG check if defined
|
||||
if qg_name and qg_name in QG_CHECKS:
|
||||
check_fn = QG_CHECKS[qg_name]
|
||||
if qg_name in ("check_analysis_approved",):
|
||||
# Requires human approval - post request comment if analyst just finished
|
||||
if agent == "analyst" and qg_name == "check_analysis_approved" and work_item_id:
|
||||
files_check = QG_CHECKS.get("check_analysis_complete")
|
||||
if files_check:
|
||||
files_ok, _ = files_check(repo, work_item_id, branch)
|
||||
if files_ok:
|
||||
# Full artifacts ready -> In Review
|
||||
from ..plane_sync import set_issue_in_review
|
||||
set_issue_in_review(work_item_id)
|
||||
plane_add_comment(
|
||||
work_item_id,
|
||||
"\U0001f4cb BRD/\u0422\u0417/AC/TestPlan \u0433\u043e\u0442\u043e\u0432\u044b. "
|
||||
"\u041f\u0440\u043e\u0448\u0443 review \u0438 \u0440\u0435\u0430\u043a\u0446\u0438\u044e :approved: \u0434\u043b\u044f \u043f\u0440\u043e\u0434\u0432\u0438\u0436\u0435\u043d\u0438\u044f \u0432 Architecture."
|
||||
)
|
||||
notify_approve_requested(task_id)
|
||||
logger.info(f"Task {task_id}: analyst finished, requested :approved: in Plane")
|
||||
else:
|
||||
# Check if questions file exists (in the task worktree)
|
||||
import os as _os
|
||||
questions_path = _os.path.join(
|
||||
get_worktree_path(repo, branch),
|
||||
f"docs/work-items/{work_item_id}/01-questions.md"
|
||||
)
|
||||
if _os.path.isfile(questions_path):
|
||||
# Analyst has questions -> Needs Input
|
||||
from ..plane_sync import set_issue_needs_input
|
||||
set_issue_needs_input(work_item_id)
|
||||
with open(questions_path, "r") as qf:
|
||||
questions_text = qf.read()
|
||||
plane_add_comment(
|
||||
work_item_id,
|
||||
f"\u2753 Analyst \u043d\u0443\u0436\u0434\u0430\u0435\u0442\u0441\u044f \u0432 \u0443\u0442\u043e\u0447\u043d\u0435\u043d\u0438\u0438:\n\n{questions_text}"
|
||||
)
|
||||
from ..notifications import send_telegram
|
||||
send_telegram(
|
||||
f"\u2753 {work_item_id}: Analyst \u0437\u0430\u0434\u0430\u0451\u0442 \u0432\u043e\u043f\u0440\u043e\u0441\u044b. \u041e\u0442\u0432\u0435\u0442\u044c \u0432 Plane."
|
||||
)
|
||||
else:
|
||||
# No artifacts and no questions
|
||||
plane_add_comment(
|
||||
work_item_id,
|
||||
"\u26a0\ufe0f Analyst \u0437\u0430\u0432\u0435\u0440\u0448\u0438\u043b\u0441\u044f \u0431\u0435\u0437 \u0430\u0440\u0442\u0435\u0444\u0430\u043a\u0442\u043e\u0432 \u0438 \u0431\u0435\u0437 \u0432\u043e\u043f\u0440\u043e\u0441\u043e\u0432. \u041f\u0440\u043e\u0432\u0435\u0440\u044c\u0442\u0435 \u043b\u043e\u0433."
|
||||
)
|
||||
return
|
||||
elif qg_name in ("check_ci_green", "check_tests_local"):
|
||||
# (repo, branch) signature — already worktree-aware.
|
||||
passed, reason = check_fn(repo, branch)
|
||||
elif qg_name == "check_tests_passed":
|
||||
# Artifact check — pass branch so it reads from the worktree.
|
||||
passed, reason = check_fn(repo, work_item_id or "", branch)
|
||||
else:
|
||||
# Other artifact checks (check_architecture_done, etc.) — worktree-aware.
|
||||
passed, reason = check_fn(repo, work_item_id or "", branch)
|
||||
|
||||
if not passed:
|
||||
logger.info(f"Task {task_id}: QG '{qg_name}' not passed after {agent}: {reason}")
|
||||
# If reviewer says REQUEST_CHANGES, rollback to development
|
||||
if agent == "reviewer" and "REQUEST_CHANGES" in reason:
|
||||
update_task_stage(task_id, "development")
|
||||
notify_stage_change(task_id, current_stage, "development")
|
||||
plane_notify_stage(work_item_id, current_stage, "development")
|
||||
# Count retries
|
||||
conn2 = get_db()
|
||||
retry_count = conn2.execute(
|
||||
"SELECT COUNT(*) FROM agent_runs WHERE task_id=? AND agent='developer'",
|
||||
(task_id,)
|
||||
).fetchone()[0]
|
||||
conn2.close()
|
||||
if retry_count < 3:
|
||||
task_desc = (
|
||||
f"Work item: {work_item_id}\nRepo: {repo}\nBranch: {branch}\n"
|
||||
f"Stage: development\nNote: REQUEST_CHANGES from reviewer "
|
||||
f"(attempt {retry_count+1}/3). Fix findings in "
|
||||
f"docs/work-items/{work_item_id}/12-review.md"
|
||||
)
|
||||
new_job = enqueue_job("developer", repo, task_desc, task_id=task_id)
|
||||
logger.info(f"Task {task_id}: reviewer REQUEST_CHANGES, enqueued developer (job_id={new_job})")
|
||||
else:
|
||||
from ..notifications import send_telegram
|
||||
send_telegram(f"\u26a0\ufe0f {work_item_id}: Max developer retries (3) reached. Manual intervention needed.")
|
||||
logger.error(f"Task {task_id}: max retries reached")
|
||||
|
||||
# Task 6: Tester FAIL -> rollback to development
|
||||
if agent == "tester" and qg_name == "check_tests_passed" and not passed:
|
||||
update_task_stage(task_id, "development")
|
||||
notify_stage_change(task_id, current_stage, "development")
|
||||
plane_notify_stage(work_item_id, current_stage, "development")
|
||||
from ..plane_sync import set_issue_in_progress
|
||||
set_issue_in_progress(work_item_id)
|
||||
plane_add_comment(
|
||||
work_item_id,
|
||||
f"\u274c \u0422\u0435\u0441\u0442\u044b \u043d\u0435 \u043f\u0440\u043e\u0448\u043b\u0438: {reason}. Developer \u043f\u0435\u0440\u0435\u0437\u0430\u043f\u0443\u0449\u0435\u043d \u0434\u043b\u044f \u0444\u0438\u043a\u0441\u0430."
|
||||
)
|
||||
conn2 = get_db()
|
||||
retry_count = conn2.execute(
|
||||
"SELECT COUNT(*) FROM agent_runs WHERE task_id=? AND agent='developer'",
|
||||
(task_id,)
|
||||
).fetchone()[0]
|
||||
conn2.close()
|
||||
if retry_count < 3:
|
||||
task_desc = (
|
||||
f"Work item: {work_item_id}\nRepo: {repo}\nBranch: {branch}\n"
|
||||
f"Stage: development\nNote: Tests FAILED. "
|
||||
f"Fix failures described in docs/work-items/{work_item_id}/13-test-report.md"
|
||||
)
|
||||
new_job = enqueue_job("developer", repo, task_desc, task_id=task_id)
|
||||
logger.info(f"Task {task_id}: tester FAIL, enqueued developer (job_id={new_job})")
|
||||
else:
|
||||
from ..notifications import send_telegram
|
||||
from ..plane_sync import set_issue_blocked
|
||||
set_issue_blocked(work_item_id)
|
||||
send_telegram(f"\U0001f6a8 {work_item_id}: Tests still failing after 3 developer retries. Manual intervention needed.")
|
||||
|
||||
# Task 8: Architect conflict -> rollback to analysis
|
||||
if agent == "architect" and qg_name == "check_architecture_done" and not passed:
|
||||
import os as _os
|
||||
conflict_path = _os.path.join(
|
||||
get_worktree_path(repo, branch),
|
||||
f"docs/work-items/{work_item_id}/10-conflict.md"
|
||||
)
|
||||
if _os.path.isfile(conflict_path):
|
||||
update_task_stage(task_id, "analysis")
|
||||
notify_stage_change(task_id, current_stage, "analysis")
|
||||
plane_notify_stage(work_item_id, current_stage, "analysis")
|
||||
from ..plane_sync import set_issue_in_progress
|
||||
set_issue_in_progress(work_item_id)
|
||||
with open(conflict_path, "r") as cf:
|
||||
conflict_text = cf.read()[:500]
|
||||
plane_add_comment(
|
||||
work_item_id,
|
||||
f"\u26a0\ufe0f Architect \u043d\u0430\u0448\u0451\u043b \u043a\u043e\u043d\u0444\u043b\u0438\u043a\u0442 \u0441 \u0422\u0417. \u0412\u043e\u0437\u0432\u0440\u0430\u0442 \u0432 Analysis.\n\n{conflict_text}"
|
||||
)
|
||||
task_desc = (
|
||||
f"Work item: {work_item_id}\nRepo: {repo}\nBranch: {branch}\n"
|
||||
f"Stage: analysis\nNote: Architect conflict. Revise TRZ. "
|
||||
f"See docs/work-items/{work_item_id}/10-conflict.md"
|
||||
)
|
||||
new_job = enqueue_job("analyst", repo, task_desc, task_id=task_id)
|
||||
logger.info(f"Task {task_id}: architect conflict, enqueued analyst (job_id={new_job})")
|
||||
return
|
||||
|
||||
return
|
||||
elif qg_name:
|
||||
return
|
||||
|
||||
# Advance stage
|
||||
update_task_stage(task_id, next_stage)
|
||||
notify_stage_change(task_id, current_stage, next_stage)
|
||||
plane_notify_stage(work_item_id, current_stage, next_stage)
|
||||
logger.info(f"Task {task_id}: {current_stage} -> {next_stage} (auto-advance after {agent})")
|
||||
|
||||
# Launch next agent if defined
|
||||
next_agent = get_agent_for_stage(next_stage)
|
||||
if next_agent:
|
||||
task_desc = f"Work item: {work_item_id}\nRepo: {repo}\nBranch: {branch}\nStage: {next_stage}"
|
||||
new_job_id = enqueue_job(next_agent, repo, task_desc, task_id=task_id)
|
||||
logger.info(f"Task {task_id}: enqueued '{next_agent}' (job_id={new_job_id})")
|
||||
|
||||
from ..stage_engine import advance_stage
|
||||
advance_stage(
|
||||
task_id=task_id,
|
||||
current_stage=current_stage,
|
||||
repo=repo,
|
||||
work_item_id=work_item_id,
|
||||
branch=branch,
|
||||
finished_agent=agent,
|
||||
)
|
||||
except Exception as e:
|
||||
logger.error(f"Auto-advance failed for run_id={run_id}: {e}")
|
||||
|
||||
@@ -703,47 +685,6 @@ class AgentLauncher:
|
||||
logger.error(f"Failed to create PR for {branch}: {e}")
|
||||
return None
|
||||
|
||||
def _auto_merge_pr(self, repo: str, branch: str, task_id: int, work_item_id: str):
|
||||
import httpx
|
||||
owner = settings.gitea_owner
|
||||
headers = {"Authorization": f"token {settings.gitea_token}"}
|
||||
base_url = f"{settings.gitea_url}/api/v1"
|
||||
try:
|
||||
resp = httpx.get(
|
||||
f"{base_url}/repos/{owner}/{repo}/pulls",
|
||||
params={"state": "open", "head": branch},
|
||||
headers=headers, timeout=10
|
||||
)
|
||||
resp.raise_for_status()
|
||||
prs = resp.json()
|
||||
if not prs:
|
||||
pr_number = self._ensure_pr(repo, branch, 0)
|
||||
if not pr_number:
|
||||
return False
|
||||
else:
|
||||
pr_number = prs[0]["number"]
|
||||
resp = httpx.post(
|
||||
f"{base_url}/repos/{owner}/{repo}/pulls/{pr_number}/merge",
|
||||
json={"Do": "merge"},
|
||||
headers=headers, timeout=30
|
||||
)
|
||||
if resp.status_code in (200, 204):
|
||||
logger.info(f"PR #{pr_number} merged for {branch}")
|
||||
update_task_stage(task_id, "done")
|
||||
notify_stage_change(task_id, "deploy", "done")
|
||||
plane_notify_stage(work_item_id, "deploy", "done")
|
||||
from ..notifications import send_telegram
|
||||
send_telegram(f"\u2705 {work_item_id}: PR #{pr_number} merged! deploy -> done. Task complete.")
|
||||
return True
|
||||
else:
|
||||
logger.error(f"Merge failed for PR #{pr_number}: {resp.status_code} {resp.text}")
|
||||
from ..notifications import send_telegram
|
||||
send_telegram(f"\u26a0\ufe0f {work_item_id}: Auto-merge failed (HTTP {resp.status_code}). Manual merge needed.")
|
||||
return False
|
||||
except Exception as e:
|
||||
logger.error(f"Auto-merge failed for {branch}: {e}")
|
||||
return False
|
||||
|
||||
def _write_task_file(self, repo: str, branch: str, task_file: str, content: str):
|
||||
"""Write task file directly into the task's worktree.
|
||||
|
||||
|
||||
@@ -9,6 +9,17 @@ class Settings(BaseSettings):
|
||||
plane_webhook_secret: str = ""
|
||||
plane_project_id: str = ""
|
||||
|
||||
# Per-agent Plane bot tokens (feat: per-agent comment authorship).
|
||||
# When set, add_comment posts under the matching bot so Plane shows the
|
||||
# real author (Analyst/Architect/...). Empty -> fallback to plane_api_token.
|
||||
plane_bot_analyst: str = ""
|
||||
plane_bot_architect: str = ""
|
||||
plane_bot_developer: str = ""
|
||||
plane_bot_reviewer: str = ""
|
||||
plane_bot_tester: str = ""
|
||||
plane_bot_deployer: str = ""
|
||||
plane_bot_stream: str = ""
|
||||
|
||||
# Gitea
|
||||
gitea_url: str = "http://localhost:3000"
|
||||
gitea_token: str = ""
|
||||
@@ -53,6 +64,28 @@ class Settings(BaseSettings):
|
||||
breaker_threshold: int = 3
|
||||
breaker_pause_seconds: int = 300
|
||||
|
||||
# ORCH-7 (M-2): agent timeout + graceful kill.
|
||||
# agent_timeout_seconds -> default per-agent wall-clock budget; the watchdog
|
||||
# kills the run after this (env ORCH_AGENT_TIMEOUT_SECONDS).
|
||||
# agent_kill_grace_seconds-> pause between SIGTERM and SIGKILL so claude can
|
||||
# flush artifacts before the hard kill
|
||||
# (env ORCH_AGENT_KILL_GRACE_SECONDS).
|
||||
# agent_timeout_overrides_json -> optional per-agent override JSON object,
|
||||
# e.g. {"reviewer": 3600, "architect": 2700}
|
||||
# (env ORCH_AGENT_TIMEOUT_OVERRIDES_JSON).
|
||||
agent_timeout_seconds: int = 1800
|
||||
agent_kill_grace_seconds: int = 20
|
||||
agent_timeout_overrides_json: str = ""
|
||||
|
||||
# L-2: run-log rotation. Old per-run logs in <data>/runs/*.log are pruned at
|
||||
# app startup (best-effort). A *.log is removed if it is older than
|
||||
# log_keep_days OR not within the log_keep_max most-recent logs (whichever
|
||||
# hits first). Only *.log files are touched; the active run log is skipped.
|
||||
# log_keep_days -> max age in days (env ORCH_LOG_KEEP_DAYS).
|
||||
# log_keep_max -> max number of newest logs to retain (env ORCH_LOG_KEEP_MAX).
|
||||
log_keep_days: int = 30
|
||||
log_keep_max: int = 500
|
||||
|
||||
|
||||
# Telegram notifications
|
||||
telegram_bot_token: str = ""
|
||||
|
||||
38
src/db.py
38
src/db.py
@@ -67,6 +67,17 @@ def init_db():
|
||||
# (CREATE TABLE IF NOT EXISTS won't add columns to an already-created table).
|
||||
_ensure_column(conn, "jobs", "transient_attempts", "INTEGER NOT NULL DEFAULT 0")
|
||||
_ensure_column(conn, "jobs", "available_at", "TEXT")
|
||||
# ORCH-5 (M-7): webhook delivery de-dup. Add events.delivery_id and a PARTIAL
|
||||
# unique index. Partial (WHERE delivery_id IS NOT NULL) so pre-existing rows
|
||||
# (which have NULL delivery_id) never collide with each other. Restart-safe:
|
||||
# _ensure_column is a no-op once the column exists, and CREATE INDEX IF NOT
|
||||
# EXISTS is a no-op once the index exists, so this is safe on the live prod DB.
|
||||
_ensure_column(conn, "events", "delivery_id", "TEXT")
|
||||
conn.execute(
|
||||
"CREATE UNIQUE INDEX IF NOT EXISTS idx_events_delivery "
|
||||
"ON events(delivery_id) WHERE delivery_id IS NOT NULL"
|
||||
)
|
||||
conn.commit()
|
||||
conn.close()
|
||||
|
||||
|
||||
@@ -141,6 +152,33 @@ def get_next_work_item_id(repo: str, prefix: str = "ET") -> str:
|
||||
return f"{prefix}-{next_num:03d}"
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# ORCH-5 (M-7): idempotent webhook event logging
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def insert_event_dedup(
|
||||
source: str, event_type: str, payload: str, delivery_id: str
|
||||
) -> bool:
|
||||
"""Idempotently log a webhook event keyed by delivery_id.
|
||||
|
||||
Returns True if a NEW row was inserted (caller should dispatch the event) and
|
||||
False if this delivery_id was already present (a duplicate delivery -> caller
|
||||
must skip dispatch/enqueue). Uses INSERT OR IGNORE against the partial UNIQUE
|
||||
index idx_events_delivery; rowcount==1 means the row was actually inserted.
|
||||
"""
|
||||
conn = get_db()
|
||||
try:
|
||||
cur = conn.execute(
|
||||
"INSERT OR IGNORE INTO events (source, event_type, payload, delivery_id) "
|
||||
"VALUES (?, ?, ?, ?)",
|
||||
(source, event_type, payload, delivery_id),
|
||||
)
|
||||
conn.commit()
|
||||
return cur.rowcount == 1
|
||||
finally:
|
||||
conn.close()
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# ORCH-1 (F-2b): job queue helpers
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
16
src/main.py
16
src/main.py
@@ -60,6 +60,22 @@ async def lifespan(app: FastAPI):
|
||||
if requeued:
|
||||
log.warning(f"Queue-recovery: requeued {requeued} running job(s) after restart")
|
||||
|
||||
# L-2: rotate old per-run logs at startup (best-effort; never fatal).
|
||||
try:
|
||||
import os as _os
|
||||
from .config import settings as _settings
|
||||
from .agents.launcher import prune_run_logs
|
||||
_runs_dir = _os.path.join(_os.path.dirname(_settings.db_path), "runs")
|
||||
_removed = prune_run_logs(
|
||||
_runs_dir,
|
||||
keep_days=_settings.log_keep_days,
|
||||
keep_max=_settings.log_keep_max,
|
||||
)
|
||||
if _removed:
|
||||
log.info(f"Log rotation: pruned {_removed} old run log(s) from {_runs_dir}")
|
||||
except Exception as e:
|
||||
log.warning(f"Log rotation skipped: {e}")
|
||||
|
||||
# Start the background job-queue worker (ORCH-1).
|
||||
from .queue_worker import worker
|
||||
worker.start()
|
||||
|
||||
@@ -6,9 +6,53 @@ from .config import settings
|
||||
|
||||
logger = logging.getLogger("orchestrator.plane_sync")
|
||||
|
||||
# L-3: emoji literals used in Plane comment bodies, named for readability.
|
||||
# Message text stays byte-for-byte identical to the previous output.
|
||||
EMOJI_STAGE = "\U0001F504" # stage transition
|
||||
EMOJI_QG_FAIL = "\u26A0\uFE0F" # quality-gate failure
|
||||
EMOJI_DONE = "\u2705" # task completed
|
||||
|
||||
PLANE_BASE = f"{settings.plane_api_url}/api/v1"
|
||||
PLANE_HEADERS = {"X-API-Key": settings.plane_api_token}
|
||||
WORKSPACE = settings.plane_workspace_slug
|
||||
|
||||
# feat(plane): per-agent comment authorship.
|
||||
# Map an agent role -> its dedicated Plane bot token (read from config / env).
|
||||
# When the token is present, add_comment() POSTs under that bot so Plane shows
|
||||
# the real author. Empty/unknown role -> fallback to the shared orchestrator
|
||||
# token (PLANE_HEADERS), so commenting stays autonomous.
|
||||
PLANE_BOT_TOKENS = {
|
||||
"analyst": settings.plane_bot_analyst,
|
||||
"architect": settings.plane_bot_architect,
|
||||
"developer": settings.plane_bot_developer,
|
||||
"reviewer": settings.plane_bot_reviewer,
|
||||
"tester": settings.plane_bot_tester,
|
||||
"deployer": settings.plane_bot_deployer,
|
||||
"stream": settings.plane_bot_stream,
|
||||
}
|
||||
|
||||
# Map a pipeline stage -> the agent role that owns work in that stage. Used to
|
||||
# pick an author for rollback/stage notifications targeting a specific stage.
|
||||
STAGE_AUTHORS = {
|
||||
"analysis": "analyst",
|
||||
"architecture": "architect",
|
||||
"development": "developer",
|
||||
"review": "reviewer",
|
||||
"testing": "tester",
|
||||
"deploy": "deployer",
|
||||
}
|
||||
|
||||
|
||||
def _headers_for(author: str | None) -> dict:
|
||||
"""Return X-API-Key headers for the given agent role.
|
||||
|
||||
Falls back to the shared orchestrator token (PLANE_HEADERS /
|
||||
settings.plane_api_token) when the role is None, unknown, or its bot token
|
||||
is not configured. This keeps comment posting autonomous: a comment is
|
||||
always written, just attributed to the orchestrator if no bot is set.
|
||||
"""
|
||||
tok = PLANE_BOT_TOKENS.get(author or "") if author else None
|
||||
return {"X-API-Key": tok} if tok else PLANE_HEADERS
|
||||
PROJECT_ID = settings.plane_project_id or "7a79f0a9-5278-49cd-9007-9a338f238f9c"
|
||||
|
||||
|
||||
@@ -65,6 +109,24 @@ STAGE_TO_STATE = {
|
||||
}
|
||||
|
||||
|
||||
def fetch_issue_sequence_id(issue_id: str, project_id: str) -> int | None:
|
||||
"""M-6: GET the Plane issue by UUID and return its sequence_id (the
|
||||
authoritative per-project number), or None if unavailable.
|
||||
|
||||
Returns None on network error, non-2xx, or a missing field - never raises,
|
||||
so the webhook handler can fall back to DB increment and stay autonomous.
|
||||
"""
|
||||
url = f"{PLANE_BASE}/workspaces/{WORKSPACE}/projects/{project_id}/issues/{issue_id}/"
|
||||
try:
|
||||
resp = httpx.get(url, headers=PLANE_HEADERS, timeout=10)
|
||||
resp.raise_for_status()
|
||||
seq = resp.json().get("sequence_id")
|
||||
return int(seq) if seq is not None else None
|
||||
except Exception as e:
|
||||
logger.warning(f"fetch_issue_sequence_id failed for {issue_id}: {e}")
|
||||
return None
|
||||
|
||||
|
||||
def find_issue_id(work_item_id: str, project_id: str = None) -> str | None:
|
||||
"""Find Plane issue UUID by work_item_id (e.g. 'ET-002')."""
|
||||
project_id = _resolve_project_id(work_item_id, project_id)
|
||||
@@ -89,25 +151,26 @@ def find_issue_id(work_item_id: str, project_id: str = None) -> str | None:
|
||||
resp.raise_for_status()
|
||||
data = resp.json()
|
||||
results = data.get("results", data if isinstance(data, list) else [])
|
||||
# M-6: match by sequence_id directly (the authoritative per-project
|
||||
# number), parsed from the work_item_id suffix - no hardcoded prefix.
|
||||
try:
|
||||
target_num = int(work_item_id.rsplit("-", 1)[1])
|
||||
except (IndexError, ValueError):
|
||||
target_num = None
|
||||
for issue in results:
|
||||
seq = issue.get("sequence_id")
|
||||
identifier = f"ET-{seq:03d}" if seq else ""
|
||||
if identifier == work_item_id or work_item_id in issue.get("name", ""):
|
||||
if target_num is not None and issue.get("sequence_id") == target_num:
|
||||
return issue["id"]
|
||||
# Fallback: get all issues and match by sequence_id number
|
||||
if work_item_id.startswith("ET-"):
|
||||
try:
|
||||
target_num = int(work_item_id.split("-")[1])
|
||||
except (IndexError, ValueError):
|
||||
target_num = None
|
||||
if target_num:
|
||||
resp2 = httpx.get(url, headers=PLANE_HEADERS, timeout=10)
|
||||
resp2.raise_for_status()
|
||||
data2 = resp2.json()
|
||||
results2 = data2.get("results", data2 if isinstance(data2, list) else [])
|
||||
for issue in results2:
|
||||
if issue.get("sequence_id") == target_num:
|
||||
return issue["id"]
|
||||
if work_item_id in issue.get("name", ""):
|
||||
return issue["id"]
|
||||
# Fallback: get all issues and match by sequence_id number (any prefix)
|
||||
if target_num is not None:
|
||||
resp2 = httpx.get(url, headers=PLANE_HEADERS, timeout=10)
|
||||
resp2.raise_for_status()
|
||||
data2 = resp2.json()
|
||||
results2 = data2.get("results", data2 if isinstance(data2, list) else [])
|
||||
for issue in results2:
|
||||
if issue.get("sequence_id") == target_num:
|
||||
return issue["id"]
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to find issue for {work_item_id}: {e}")
|
||||
return None
|
||||
@@ -134,8 +197,14 @@ def update_issue_state(work_item_id: str, stage: str, project_id: str = None):
|
||||
logger.error(f"Failed to update Plane state for {work_item_id}: {e}")
|
||||
|
||||
|
||||
def add_comment(work_item_id: str, text: str, project_id: str = None):
|
||||
"""Add a comment to Plane issue."""
|
||||
def add_comment(work_item_id: str, text: str, project_id: str = None, author: str = None):
|
||||
"""Add a comment to a Plane issue.
|
||||
|
||||
feat(plane): when ``author`` (an agent role) maps to a configured bot
|
||||
token, the comment is POSTed under that bot so Plane shows the real author.
|
||||
Otherwise it falls back to the shared orchestrator token (see
|
||||
``_headers_for``). GET/PATCH calls elsewhere keep using PLANE_HEADERS.
|
||||
"""
|
||||
project_id = _resolve_project_id(work_item_id, project_id)
|
||||
issue_id = find_issue_id(work_item_id, project_id)
|
||||
if not issue_id:
|
||||
@@ -145,9 +214,9 @@ def add_comment(work_item_id: str, text: str, project_id: str = None):
|
||||
url = f"{PLANE_BASE}/workspaces/{WORKSPACE}/projects/{project_id}/issues/{issue_id}/comments/"
|
||||
html = f"<p>{text}</p>"
|
||||
try:
|
||||
resp = httpx.post(url, headers=PLANE_HEADERS, json={"comment_html": html}, timeout=10)
|
||||
resp = httpx.post(url, headers=_headers_for(author), json={"comment_html": html}, timeout=10)
|
||||
resp.raise_for_status()
|
||||
logger.info(f"Plane: comment added to {work_item_id}")
|
||||
logger.info(f"Plane: comment added to {work_item_id} (author={author or 'orchestrator'})")
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to add comment to {work_item_id}: {e}")
|
||||
|
||||
@@ -194,7 +263,7 @@ def notify_stage_change(work_item_id: str, old_stage: str, new_stage: str, agent
|
||||
project_id = _resolve_project_id(work_item_id, project_id)
|
||||
update_issue_state(work_item_id, new_stage, project_id)
|
||||
|
||||
msg = f"🔄 Stage: {old_stage} → {new_stage}"
|
||||
msg = f"{EMOJI_STAGE} Stage: {old_stage} → {new_stage}"
|
||||
if agent:
|
||||
msg += f" (launching {agent})"
|
||||
|
||||
@@ -227,16 +296,29 @@ def notify_stage_change(work_item_id: str, old_stage: str, new_stage: str, agent
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
add_comment(work_item_id, msg, project_id)
|
||||
# Stage transition is the orchestrator's own voice -> attribute to stream.
|
||||
add_comment(work_item_id, msg, project_id, author="stream")
|
||||
|
||||
|
||||
def notify_qg_failure(work_item_id: str, stage: str, check: str, reason: str, project_id: str = None):
|
||||
"""Notify Plane about QG failure."""
|
||||
add_comment(work_item_id, f"⚠️ QG failed at {stage}: {check} — {reason}", project_id)
|
||||
# QG failure belongs to the agent that owns the failing stage.
|
||||
add_comment(
|
||||
work_item_id,
|
||||
f"{EMOJI_QG_FAIL} QG failed at {stage}: {check} — {reason}",
|
||||
project_id,
|
||||
author=STAGE_AUTHORS.get(stage, "stream"),
|
||||
)
|
||||
|
||||
|
||||
def notify_done(work_item_id: str, project_id: str = None):
|
||||
"""Mark issue as Done in Plane."""
|
||||
project_id = _resolve_project_id(work_item_id, project_id)
|
||||
update_issue_state(work_item_id, "done", project_id)
|
||||
add_comment(work_item_id, "✅ Task completed! PR merged and deployed.", project_id)
|
||||
# Deploy finished the task -> attribute the completion comment to Deployer.
|
||||
add_comment(
|
||||
work_item_id,
|
||||
f"{EMOJI_DONE} Task completed! PR merged and deployed.",
|
||||
project_id,
|
||||
author="deployer",
|
||||
)
|
||||
|
||||
430
src/stage_engine.py
Normal file
430
src/stage_engine.py
Normal file
@@ -0,0 +1,430 @@
|
||||
"""Unified stage engine (ORCH-4 / M-3).
|
||||
|
||||
Single source of truth for "an agent finished / a human approved -> run the
|
||||
stage's quality gate and either advance the pipeline or roll it back".
|
||||
|
||||
Before ORCH-4 this logic was duplicated in two places that had silently
|
||||
diverged:
|
||||
- src/agents/launcher.py::_try_advance_stage (sync, rich business logic:
|
||||
analyst approved-flow, reviewer REQUEST_CHANGES rollback+retry, tester FAIL
|
||||
rollback+retry, architect conflict rollback) — but it picked the next agent
|
||||
with get_agent_for_stage(next_stage), which is WRONG.
|
||||
- src/webhooks/plane.py::_try_advance_stage (async, leaner, but it had the
|
||||
check_review_approved PR-by-branch dispatch and used the CORRECT
|
||||
get_agent_for_stage(current_stage)).
|
||||
|
||||
This module merges both into one sync `advance_stage(...)`. launcher calls it
|
||||
directly; the plane webhook calls it through asyncio.to_thread so there is
|
||||
exactly one implementation.
|
||||
|
||||
Agent-selection bug fix (ORCH-4):
|
||||
stages.py defines `agent` as "the agent to launch when advancing FROM this
|
||||
stage". So when advancing current -> next, the correct agent to launch is
|
||||
get_agent_for_stage(current_stage). launcher's old next_stage lookup skipped a
|
||||
stage (e.g. analysis->architecture launched 'developer' instead of
|
||||
'architect'). plane and gitea already used current_stage; we unify on that.
|
||||
"""
|
||||
|
||||
import logging
|
||||
import os
|
||||
from dataclasses import dataclass, field
|
||||
|
||||
from .db import get_db, update_task_stage, enqueue_job
|
||||
from .stages import get_next_stage, get_qg_for_stage, get_agent_for_stage
|
||||
from .git_worktree import get_worktree_path
|
||||
from .qg.checks import QG_CHECKS
|
||||
from .notifications import (
|
||||
notify_stage_change,
|
||||
notify_qg_failure,
|
||||
notify_approve_requested,
|
||||
send_telegram,
|
||||
)
|
||||
from .plane_sync import (
|
||||
notify_stage_change as plane_notify_stage,
|
||||
notify_qg_failure as plane_notify_qg,
|
||||
add_comment as plane_add_comment,
|
||||
set_issue_in_review,
|
||||
set_issue_needs_input,
|
||||
set_issue_in_progress,
|
||||
set_issue_blocked,
|
||||
)
|
||||
from .config import settings
|
||||
|
||||
logger = logging.getLogger("orchestrator.stage_engine")
|
||||
|
||||
MAX_DEVELOPER_RETRIES = 3
|
||||
|
||||
|
||||
@dataclass
|
||||
class AdvanceResult:
|
||||
"""Outcome of an advance_stage() call (mostly for tests/observability)."""
|
||||
|
||||
advanced: bool = False
|
||||
from_stage: str | None = None
|
||||
to_stage: str | None = None
|
||||
enqueued_agent: str | None = None
|
||||
enqueued_job_id: int | None = None
|
||||
qg_name: str | None = None
|
||||
qg_passed: bool | None = None
|
||||
qg_reason: str | None = None
|
||||
rolled_back_to: str | None = None
|
||||
alerted: bool = False
|
||||
note: str | None = None
|
||||
notes: list = field(default_factory=list)
|
||||
|
||||
|
||||
def _run_qg(qg_name: str, repo: str, work_item_id: str, branch: str):
|
||||
"""Dispatch a quality-gate check to the right signature and run it.
|
||||
|
||||
Signatures (unified from launcher + plane):
|
||||
- check_ci_green / check_tests_local -> (repo, branch)
|
||||
- check_review_approved -> (repo, pr_number) [PR found by branch]
|
||||
- everything else (artifact checks) -> (repo, work_item_id, branch)
|
||||
|
||||
Returns (passed: bool, reason: str).
|
||||
"""
|
||||
check_fn = QG_CHECKS.get(qg_name)
|
||||
if not check_fn:
|
||||
logger.error(f"QG function '{qg_name}' not found in registry")
|
||||
return False, f"Unknown QG: {qg_name}"
|
||||
|
||||
if qg_name in ("check_ci_green", "check_tests_local"):
|
||||
# (repo, branch) — already worktree-aware.
|
||||
return check_fn(repo, branch)
|
||||
|
||||
if qg_name == "check_review_approved":
|
||||
# Special case kept from plane: find the open PR for this branch via
|
||||
# Gitea, then check it; fall back to a file-based review marker.
|
||||
return _check_review_approved_by_branch(check_fn, repo, work_item_id, branch)
|
||||
|
||||
# All other artifact checks: (repo, work_item_id, branch). Pass branch so the
|
||||
# check reads from the task worktree (ORCH-2 / S-4).
|
||||
return check_fn(repo, work_item_id or "", branch)
|
||||
|
||||
|
||||
def _check_review_approved_by_branch(check_fn, repo: str, work_item_id: str, branch: str):
|
||||
"""check_review_approved dispatch preserved from plane._try_advance_stage.
|
||||
|
||||
Finds the open PR whose head ref == branch via the Gitea API and runs
|
||||
check_review_approved(repo, pr_number). If no open PR exists, falls back to a
|
||||
file-based review marker (12-review.md / 09-review.md) like the original.
|
||||
"""
|
||||
import httpx as _httpx
|
||||
|
||||
owner = settings.gitea_owner
|
||||
url = f"{settings.gitea_url}/api/v1/repos/{owner}/{repo}/pulls?state=open&limit=50"
|
||||
headers = {"Authorization": f"token {settings.gitea_token}"}
|
||||
try:
|
||||
resp = _httpx.get(url, headers=headers, timeout=10)
|
||||
prs = resp.json()
|
||||
pr_number = None
|
||||
for pr in prs:
|
||||
if pr.get("head", {}).get("ref") == branch:
|
||||
pr_number = pr["number"]
|
||||
break
|
||||
if pr_number:
|
||||
return check_fn(repo, pr_number)
|
||||
# No open PR but a review file may exist — check file-based.
|
||||
wt = get_worktree_path(repo, branch)
|
||||
if not os.path.isdir(wt):
|
||||
wt = os.path.join(settings.repos_dir, repo)
|
||||
review_path = os.path.join(wt, f"docs/work-items/{work_item_id}/12-review.md")
|
||||
review_path2 = os.path.join(wt, f"docs/work-items/{work_item_id}/09-review.md")
|
||||
if os.path.isfile(review_path) or os.path.isfile(review_path2):
|
||||
return True, "Review file exists (file-based approval)"
|
||||
return False, "No open PR found and no review file"
|
||||
except Exception as e:
|
||||
return False, f"Error finding PR: {e}"
|
||||
|
||||
|
||||
def _developer_retry_count(task_id: int) -> int:
|
||||
"""How many developer runs have already happened for this task."""
|
||||
conn = get_db()
|
||||
n = conn.execute(
|
||||
"SELECT COUNT(*) FROM agent_runs WHERE task_id=? AND agent='developer'",
|
||||
(task_id,),
|
||||
).fetchone()[0]
|
||||
conn.close()
|
||||
return n
|
||||
|
||||
|
||||
def advance_stage(
|
||||
task_id: int,
|
||||
current_stage: str,
|
||||
repo: str,
|
||||
work_item_id: str,
|
||||
branch: str,
|
||||
finished_agent: str | None = None,
|
||||
) -> AdvanceResult:
|
||||
"""Run the current stage's quality gate and advance / roll back the pipeline.
|
||||
|
||||
This is the single merged implementation (ORCH-4 / M-3). It is synchronous;
|
||||
the async plane webhook calls it via asyncio.to_thread.
|
||||
|
||||
Args:
|
||||
task_id: tasks.id
|
||||
current_stage: the stage the task is currently in
|
||||
repo: repository name
|
||||
work_item_id: Plane work item id (may be "" / None)
|
||||
branch: feature branch
|
||||
finished_agent: the agent that just finished (launcher path). Drives the
|
||||
approved/REQUEST_CHANGES/tester/architect branches. In the
|
||||
plane webhook path it is None, so those agent-specific
|
||||
branches simply do not trigger (matches old plane behavior).
|
||||
|
||||
Returns AdvanceResult describing what happened.
|
||||
"""
|
||||
result = AdvanceResult(from_stage=current_stage)
|
||||
agent = finished_agent
|
||||
try:
|
||||
qg_name = get_qg_for_stage(current_stage)
|
||||
next_stage = get_next_stage(current_stage)
|
||||
result.qg_name = qg_name
|
||||
result.to_stage = next_stage
|
||||
|
||||
if not next_stage:
|
||||
logger.info(f"Task {task_id}: already at terminal stage '{current_stage}'")
|
||||
result.note = "terminal"
|
||||
return result
|
||||
|
||||
# --- Quality gate ----------------------------------------------------
|
||||
if qg_name and qg_name in QG_CHECKS:
|
||||
# Human-approval gate: special analyst approved-flow (launcher only).
|
||||
if qg_name == "check_analysis_approved":
|
||||
_handle_analysis_approved_flow(
|
||||
task_id, current_stage, repo, work_item_id, branch, agent, result
|
||||
)
|
||||
return result
|
||||
|
||||
passed, reason = _run_qg(qg_name, repo, work_item_id, branch)
|
||||
result.qg_passed = passed
|
||||
result.qg_reason = reason
|
||||
|
||||
if not passed:
|
||||
logger.info(
|
||||
f"Task {task_id}: QG '{qg_name}' not passed after {agent}: {reason}"
|
||||
)
|
||||
# Behaviour parity:
|
||||
# - webhook path (finished_agent is None): emit the generic
|
||||
# QG-failure notification, exactly like the old plane handler.
|
||||
# - launcher path (finished_agent set): NO generic notification;
|
||||
# the rollback branches below own their own messaging, exactly
|
||||
# like the old launcher handler.
|
||||
if agent is None:
|
||||
notify_qg_failure(task_id, current_stage, qg_name, reason)
|
||||
plane_notify_qg(work_item_id, current_stage, qg_name, reason)
|
||||
|
||||
_handle_qg_failure_rollbacks(
|
||||
task_id, current_stage, repo, work_item_id, branch,
|
||||
agent, qg_name, reason, result,
|
||||
)
|
||||
return result
|
||||
|
||||
elif qg_name:
|
||||
# QG name set but not registered — do not advance (launcher behavior).
|
||||
result.note = f"qg '{qg_name}' not in registry"
|
||||
return result
|
||||
|
||||
# --- Advance ---------------------------------------------------------
|
||||
update_task_stage(task_id, next_stage)
|
||||
notify_stage_change(task_id, current_stage, next_stage)
|
||||
plane_notify_stage(work_item_id, current_stage, next_stage)
|
||||
result.advanced = True
|
||||
logger.info(
|
||||
f"Task {task_id}: {current_stage} -> {next_stage} "
|
||||
f"(auto-advance after {agent})"
|
||||
)
|
||||
|
||||
# --- Launch the next agent (ORCH-4 fix: current_stage, not next) -----
|
||||
next_agent = get_agent_for_stage(current_stage)
|
||||
if next_agent:
|
||||
task_desc = (
|
||||
f"Work item: {work_item_id}\nRepo: {repo}\n"
|
||||
f"Branch: {branch}\nStage: {next_stage}"
|
||||
)
|
||||
new_job_id = enqueue_job(next_agent, repo, task_desc, task_id=task_id)
|
||||
result.enqueued_agent = next_agent
|
||||
result.enqueued_job_id = new_job_id
|
||||
logger.info(
|
||||
f"Task {task_id}: enqueued '{next_agent}' (job_id={new_job_id})"
|
||||
)
|
||||
|
||||
return result
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"advance_stage failed for task_id={task_id}: {e}")
|
||||
result.note = f"error: {e}"
|
||||
return result
|
||||
|
||||
|
||||
def _handle_analysis_approved_flow(
|
||||
task_id, current_stage, repo, work_item_id, branch, agent, result: AdvanceResult
|
||||
):
|
||||
"""Analyst approved-flow (launcher only).
|
||||
|
||||
Only triggers when the analyst just finished (agent == 'analyst') in the
|
||||
launcher path. Decides between: artifacts ready -> In Review + request
|
||||
:approved:; questions file -> Needs Input; otherwise a warning comment.
|
||||
This gate never advances on its own (human approval does that via the plane
|
||||
webhook), matching the original launcher behavior.
|
||||
"""
|
||||
result.qg_name = "check_analysis_approved"
|
||||
result.note = "analysis-approval-gate"
|
||||
if not (agent == "analyst" and work_item_id):
|
||||
return
|
||||
|
||||
files_check = QG_CHECKS.get("check_analysis_complete")
|
||||
if not files_check:
|
||||
return
|
||||
|
||||
files_ok, _ = files_check(repo, work_item_id, branch)
|
||||
if files_ok:
|
||||
# Full artifacts ready -> In Review, ask for :approved:.
|
||||
set_issue_in_review(work_item_id)
|
||||
plane_add_comment(
|
||||
work_item_id,
|
||||
"\U0001f4cb BRD/\u0422\u0417/AC/TestPlan \u0433\u043e\u0442\u043e\u0432\u044b. "
|
||||
"\u041f\u0440\u043e\u0448\u0443 review \u0438 \u0440\u0435\u0430\u043a\u0446\u0438\u044e :approved: "
|
||||
"\u0434\u043b\u044f \u043f\u0440\u043e\u0434\u0432\u0438\u0436\u0435\u043d\u0438\u044f \u0432 Architecture.",
|
||||
author="analyst",
|
||||
)
|
||||
notify_approve_requested(task_id)
|
||||
result.note = "analysis-in-review"
|
||||
logger.info(
|
||||
f"Task {task_id}: analyst finished, requested :approved: in Plane"
|
||||
)
|
||||
return
|
||||
|
||||
questions_path = os.path.join(
|
||||
get_worktree_path(repo, branch),
|
||||
f"docs/work-items/{work_item_id}/01-questions.md",
|
||||
)
|
||||
if os.path.isfile(questions_path):
|
||||
set_issue_needs_input(work_item_id)
|
||||
with open(questions_path, "r") as qf:
|
||||
questions_text = qf.read()
|
||||
plane_add_comment(
|
||||
work_item_id,
|
||||
f"\u2753 Analyst \u043d\u0443\u0436\u0434\u0430\u0435\u0442\u0441\u044f \u0432 \u0443\u0442\u043e\u0447\u043d\u0435\u043d\u0438\u0438:\n\n{questions_text}",
|
||||
author="analyst",
|
||||
)
|
||||
send_telegram(
|
||||
f"\u2753 {work_item_id}: Analyst \u0437\u0430\u0434\u0430\u0451\u0442 \u0432\u043e\u043f\u0440\u043e\u0441\u044b. \u041e\u0442\u0432\u0435\u0442\u044c \u0432 Plane."
|
||||
)
|
||||
result.note = "analysis-needs-input"
|
||||
return
|
||||
|
||||
# No artifacts and no questions.
|
||||
plane_add_comment(
|
||||
work_item_id,
|
||||
"\u26a0\ufe0f Analyst \u0437\u0430\u0432\u0435\u0440\u0448\u0438\u043b\u0441\u044f \u0431\u0435\u0437 \u0430\u0440\u0442\u0435\u0444\u0430\u043a\u0442\u043e\u0432 \u0438 \u0431\u0435\u0437 \u0432\u043e\u043f\u0440\u043e\u0441\u043e\u0432. \u041f\u0440\u043e\u0432\u0435\u0440\u044c\u0442\u0435 \u043b\u043e\u0433.",
|
||||
author="analyst",
|
||||
)
|
||||
result.note = "analysis-empty"
|
||||
|
||||
|
||||
def _handle_qg_failure_rollbacks(
|
||||
task_id, current_stage, repo, work_item_id, branch,
|
||||
agent, qg_name, reason, result: AdvanceResult,
|
||||
):
|
||||
"""All rollback/retry branches from the original launcher, preserved verbatim.
|
||||
|
||||
Only fire on the launcher path (finished_agent is set). The webhook path
|
||||
passes finished_agent=None, so none of these agent-specific branches trigger
|
||||
— that matches the old plane behavior (it just reported the QG failure).
|
||||
"""
|
||||
# Reviewer REQUEST_CHANGES -> rollback to development + retry (max 3).
|
||||
if agent == "reviewer" and "REQUEST_CHANGES" in (reason or ""):
|
||||
update_task_stage(task_id, "development")
|
||||
notify_stage_change(task_id, current_stage, "development")
|
||||
plane_notify_stage(work_item_id, current_stage, "development")
|
||||
result.rolled_back_to = "development"
|
||||
retry_count = _developer_retry_count(task_id)
|
||||
if retry_count < MAX_DEVELOPER_RETRIES:
|
||||
task_desc = (
|
||||
f"Work item: {work_item_id}\nRepo: {repo}\nBranch: {branch}\n"
|
||||
f"Stage: development\nNote: REQUEST_CHANGES from reviewer "
|
||||
f"(attempt {retry_count+1}/3). Fix findings in "
|
||||
f"docs/work-items/{work_item_id}/12-review.md"
|
||||
)
|
||||
new_job = enqueue_job("developer", repo, task_desc, task_id=task_id)
|
||||
result.enqueued_agent = "developer"
|
||||
result.enqueued_job_id = new_job
|
||||
logger.info(
|
||||
f"Task {task_id}: reviewer REQUEST_CHANGES, enqueued developer "
|
||||
f"(job_id={new_job})"
|
||||
)
|
||||
else:
|
||||
send_telegram(
|
||||
f"\u26a0\ufe0f {work_item_id}: Max developer retries (3) reached. "
|
||||
f"Manual intervention needed."
|
||||
)
|
||||
result.alerted = True
|
||||
logger.error(f"Task {task_id}: max retries reached")
|
||||
|
||||
# Tester check_tests_passed FAIL -> rollback to development + retry (max 3).
|
||||
if agent == "tester" and qg_name == "check_tests_passed":
|
||||
update_task_stage(task_id, "development")
|
||||
notify_stage_change(task_id, current_stage, "development")
|
||||
plane_notify_stage(work_item_id, current_stage, "development")
|
||||
result.rolled_back_to = "development"
|
||||
set_issue_in_progress(work_item_id)
|
||||
plane_add_comment(
|
||||
work_item_id,
|
||||
f"\u274c \u0422\u0435\u0441\u0442\u044b \u043d\u0435 \u043f\u0440\u043e\u0448\u043b\u0438: {reason}. "
|
||||
f"Developer \u043f\u0435\u0440\u0435\u0437\u0430\u043f\u0443\u0449\u0435\u043d \u0434\u043b\u044f \u0444\u0438\u043a\u0441\u0430.",
|
||||
author="tester",
|
||||
)
|
||||
retry_count = _developer_retry_count(task_id)
|
||||
if retry_count < MAX_DEVELOPER_RETRIES:
|
||||
task_desc = (
|
||||
f"Work item: {work_item_id}\nRepo: {repo}\nBranch: {branch}\n"
|
||||
f"Stage: development\nNote: Tests FAILED. "
|
||||
f"Fix failures described in docs/work-items/{work_item_id}/13-test-report.md"
|
||||
)
|
||||
new_job = enqueue_job("developer", repo, task_desc, task_id=task_id)
|
||||
result.enqueued_agent = "developer"
|
||||
result.enqueued_job_id = new_job
|
||||
logger.info(
|
||||
f"Task {task_id}: tester FAIL, enqueued developer (job_id={new_job})"
|
||||
)
|
||||
else:
|
||||
set_issue_blocked(work_item_id)
|
||||
send_telegram(
|
||||
f"\U0001f6a8 {work_item_id}: Tests still failing after 3 developer "
|
||||
f"retries. Manual intervention needed."
|
||||
)
|
||||
result.alerted = True
|
||||
|
||||
# Architect conflict (10-conflict.md exists) -> rollback to analysis.
|
||||
if agent == "architect" and qg_name == "check_architecture_done":
|
||||
conflict_path = os.path.join(
|
||||
get_worktree_path(repo, branch),
|
||||
f"docs/work-items/{work_item_id}/10-conflict.md",
|
||||
)
|
||||
if os.path.isfile(conflict_path):
|
||||
update_task_stage(task_id, "analysis")
|
||||
notify_stage_change(task_id, current_stage, "analysis")
|
||||
plane_notify_stage(work_item_id, current_stage, "analysis")
|
||||
result.rolled_back_to = "analysis"
|
||||
set_issue_in_progress(work_item_id)
|
||||
with open(conflict_path, "r") as cf:
|
||||
conflict_text = cf.read()[:500]
|
||||
plane_add_comment(
|
||||
work_item_id,
|
||||
f"\u26a0\ufe0f Architect \u043d\u0430\u0448\u0451\u043b \u043a\u043e\u043d\u0444\u043b\u0438\u043a\u0442 \u0441 \u0422\u0417. "
|
||||
f"\u0412\u043e\u0437\u0432\u0440\u0430\u0442 \u0432 Analysis.\n\n{conflict_text}",
|
||||
author="architect",
|
||||
)
|
||||
task_desc = (
|
||||
f"Work item: {work_item_id}\nRepo: {repo}\nBranch: {branch}\n"
|
||||
f"Stage: analysis\nNote: Architect conflict. Revise TRZ. "
|
||||
f"See docs/work-items/{work_item_id}/10-conflict.md"
|
||||
)
|
||||
new_job = enqueue_job("analyst", repo, task_desc, task_id=task_id)
|
||||
result.enqueued_agent = "analyst"
|
||||
result.enqueued_job_id = new_job
|
||||
logger.info(
|
||||
f"Task {task_id}: architect conflict, enqueued analyst "
|
||||
f"(job_id={new_job})"
|
||||
)
|
||||
@@ -5,7 +5,7 @@ Stages:
|
||||
|
||||
Each stage defines:
|
||||
- next: the stage to advance to
|
||||
- agent: the agent to launch when entering the NEXT stage
|
||||
- agent: the agent to launch when advancing FROM this stage (NOT the next stage's agent)
|
||||
- qg: the quality gate check required to leave this stage
|
||||
"""
|
||||
|
||||
|
||||
52
src/webhooks/_dedup.py
Normal file
52
src/webhooks/_dedup.py
Normal file
@@ -0,0 +1,52 @@
|
||||
"""ORCH-5 (M-7): webhook delivery de-duplication helper.
|
||||
|
||||
Webhook providers (Gitea/Plane) retry deliveries on timeout, network reset, or
|
||||
manual replay. Without idempotency a retried delivery re-enters the pipeline and
|
||||
spawns a duplicate run (the ET-009 incident class: parallel conveyors on one
|
||||
repo). This module computes a stable per-delivery id so the webhook handlers can
|
||||
INSERT-OR-IGNORE into events and skip the dispatch on a repeat.
|
||||
|
||||
delivery_id format: ``f"{source}:{raw_or_hash}"`` where source prefixes
|
||||
gitea/plane so their id-spaces never collide. ``raw`` is the provider's native
|
||||
delivery header (a GUID) when present; otherwise we fall back to a sha256 of the
|
||||
body (a retried identical body yields the same hash).
|
||||
"""
|
||||
|
||||
import hashlib
|
||||
|
||||
|
||||
def _sha256_hex(*parts: str) -> str:
|
||||
h = hashlib.sha256()
|
||||
for p in parts:
|
||||
h.update(p.encode("utf-8", "replace"))
|
||||
return h.hexdigest()
|
||||
|
||||
|
||||
def gitea_delivery_id(headers, event_type: str, body: bytes) -> str:
|
||||
"""Compute the delivery_id for a Gitea webhook.
|
||||
|
||||
Prefers the ``X-Gitea-Delivery`` header (a per-delivery GUID). Falls back to
|
||||
sha256(source + event_type + body) so a retried identical body still maps to
|
||||
one id even if Gitea omitted the header.
|
||||
"""
|
||||
raw = (headers.get("X-Gitea-Delivery") or "").strip()
|
||||
if not raw:
|
||||
raw = _sha256_hex("gitea", event_type or "", body.decode("utf-8", "replace"))
|
||||
return f"gitea:{raw}"
|
||||
|
||||
|
||||
def plane_delivery_id(headers, body: bytes) -> str:
|
||||
"""Compute the delivery_id for a Plane webhook.
|
||||
|
||||
Plane does not reliably send a delivery header, so we try a couple of common
|
||||
names and otherwise fall back to sha256("plane" + body): a retried identical
|
||||
body yields the same id.
|
||||
"""
|
||||
raw = (
|
||||
headers.get("X-Plane-Delivery")
|
||||
or headers.get("X-Hook-Delivery")
|
||||
or ""
|
||||
).strip()
|
||||
if not raw:
|
||||
raw = _sha256_hex("plane", body.decode("utf-8", "replace"))
|
||||
return f"plane:{raw}"
|
||||
@@ -10,7 +10,14 @@ import httpx
|
||||
from fastapi import APIRouter, Request, HTTPException
|
||||
|
||||
from ..config import settings
|
||||
from ..db import get_db, get_task_by_repo_branch, update_task_stage, enqueue_job
|
||||
from ..db import (
|
||||
get_db,
|
||||
get_task_by_repo_branch,
|
||||
update_task_stage,
|
||||
enqueue_job,
|
||||
insert_event_dedup,
|
||||
)
|
||||
from ._dedup import gitea_delivery_id
|
||||
from ..stages import get_next_stage, get_agent_for_stage
|
||||
from ..qg.checks import check_ci_green, check_review_approved
|
||||
from ..notifications import notify_stage_change, notify_qg_failure, notify_error
|
||||
@@ -51,15 +58,17 @@ async def gitea_webhook(request: Request):
|
||||
|
||||
payload = json.loads(body)
|
||||
|
||||
# Log event
|
||||
conn = get_db()
|
||||
# ORCH-5 (M-7): idempotent logging. Compute a stable delivery_id (X-Gitea-Delivery
|
||||
# GUID, or sha256 fallback) and INSERT OR IGNORE. A repeated delivery (Gitea retry
|
||||
# / manual replay) returns inserted=False -> log + return {"status":"duplicate"}
|
||||
# WITHOUT re-dispatching, so the pipeline is not re-triggered (ET-009 class).
|
||||
# Runs AFTER HMAC verification above.
|
||||
event_type = request.headers.get("X-Gitea-Event", "unknown")
|
||||
conn.execute(
|
||||
"INSERT INTO events (source, event_type, payload) VALUES (?, ?, ?)",
|
||||
("gitea", event_type, body.decode()),
|
||||
)
|
||||
conn.commit()
|
||||
conn.close()
|
||||
delivery_id = gitea_delivery_id(request.headers, event_type, body)
|
||||
inserted = insert_event_dedup("gitea", event_type, body.decode(), delivery_id)
|
||||
if not inserted:
|
||||
logger.info(f"Gitea webhook duplicate delivery_id={delivery_id}, skipping dispatch")
|
||||
return {"status": "duplicate"}
|
||||
|
||||
if event_type == "push":
|
||||
await handle_push(payload)
|
||||
|
||||
@@ -15,7 +15,9 @@ from ..db import (
|
||||
get_next_work_item_id,
|
||||
update_task_stage,
|
||||
enqueue_job,
|
||||
insert_event_dedup,
|
||||
)
|
||||
from ._dedup import plane_delivery_id
|
||||
from ..stages import get_next_stage, get_agent_for_stage, get_qg_for_stage, get_previous_stage
|
||||
from ..qg.checks import QG_CHECKS
|
||||
from ..notifications import notify_stage_change, notify_qg_failure, notify_error
|
||||
@@ -61,14 +63,18 @@ async def plane_webhook(request: Request):
|
||||
|
||||
payload = json.loads(body)
|
||||
|
||||
# Log event
|
||||
conn = get_db()
|
||||
conn.execute(
|
||||
"INSERT INTO events (source, event_type, payload) VALUES (?, ?, ?)",
|
||||
("plane", payload.get("event", "unknown"), body.decode()),
|
||||
)
|
||||
conn.commit()
|
||||
conn.close()
|
||||
# ORCH-5 (M-7): idempotent logging. Plane rarely sends a delivery header, so the
|
||||
# delivery_id falls back to sha256("plane" + body) (a retried identical body maps
|
||||
# to one id). INSERT OR IGNORE; a duplicate returns inserted=False -> log + return
|
||||
# {"status":"duplicate"} WITHOUT dispatching. Runs AFTER HMAC and BEFORE the ORCH-6
|
||||
# project filter, so a repeat does no extra work; the FIRST delivery of an unknown
|
||||
# project still falls through to the filter below and returns {"status":"ignored"}.
|
||||
event_type = payload.get("event", "unknown")
|
||||
delivery_id = plane_delivery_id(request.headers, body)
|
||||
inserted = insert_event_dedup("plane", event_type, body.decode(), delivery_id)
|
||||
if not inserted:
|
||||
logger.info(f"Plane webhook duplicate delivery_id={delivery_id}, skipping dispatch")
|
||||
return {"status": "duplicate"}
|
||||
|
||||
event = payload.get("event")
|
||||
action = payload.get("action", "")
|
||||
@@ -148,8 +154,20 @@ async def handle_work_item_created(data: dict, project_id: str = ""):
|
||||
logger.info(f"QG-0 failed for {plane_id}: {errors}")
|
||||
return
|
||||
|
||||
# Generate work item ID
|
||||
work_item_id = get_next_work_item_id(repo, proj.work_item_prefix)
|
||||
# Generate work item ID.
|
||||
# M-6: source of truth for the number is the Plane sequence_id. Fetch it by
|
||||
# issue UUID; if Plane is unavailable, fall back to the DB increment so a
|
||||
# Plane outage never blocks task creation (autonomy > exact numbering).
|
||||
from ..plane_sync import fetch_issue_sequence_id
|
||||
seq = fetch_issue_sequence_id(plane_id, plane_project_id)
|
||||
if seq is not None:
|
||||
work_item_id = f"{proj.work_item_prefix}-{seq:03d}"
|
||||
else:
|
||||
work_item_id = get_next_work_item_id(repo, proj.work_item_prefix)
|
||||
logger.warning(
|
||||
f"Plane sequence_id unavailable for {plane_id}, "
|
||||
f"fell back to DB increment: {work_item_id}"
|
||||
)
|
||||
|
||||
# Create slug from name
|
||||
slug = re.sub(r"[^a-z0-9]+", "-", name.lower()).strip("-")[:30]
|
||||
@@ -191,7 +209,7 @@ async def handle_work_item_created(data: dict, project_id: str = ""):
|
||||
logger.info(f"Task {task_id}: enqueued analyst (job_id={job_id})")
|
||||
# Post start comment to Plane
|
||||
from ..plane_sync import add_comment as _add_comment
|
||||
_add_comment(work_item_id, "\U0001f50d Analyst \u0437\u0430\u043f\u0443\u0449\u0435\u043d. BRD/\u0422\u0417/AC/TestPlan \u0432 \u0440\u0430\u0431\u043e\u0442\u0435 (\u043e\u0436\u0438\u0434\u0430\u0439\u0442\u0435 8-15 \u043c\u0438\u043d).")
|
||||
_add_comment(work_item_id, "\U0001f50d Analyst \u0437\u0430\u043f\u0443\u0449\u0435\u043d. BRD/\u0422\u0417/AC/TestPlan \u0432 \u0440\u0430\u0431\u043e\u0442\u0435 (\u043e\u0436\u0438\u0434\u0430\u0439\u0442\u0435 8-15 \u043c\u0438\u043d).", author="analyst")
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to launch analyst for {work_item_id}: {e}")
|
||||
|
||||
@@ -234,7 +252,7 @@ async def handle_comment(data: dict, project_id: str = ""):
|
||||
)
|
||||
new_job = enqueue_job("analyst", repo, task_desc, task_id=task_id)
|
||||
from ..plane_sync import add_comment as _plane_comment
|
||||
_plane_comment(work_item_id, f"\U0001f504 Analyst \u043f\u0435\u0440\u0435\u0437\u0430\u043f\u0443\u0449\u0435\u043d. \u041f\u0440\u0438\u0447\u0438\u043d\u0430 \u043e\u0442\u043a\u043b\u043e\u043d\u0435\u043d\u0438\u044f: {reason}")
|
||||
_plane_comment(work_item_id, f"\U0001f504 Analyst \u043f\u0435\u0440\u0435\u0437\u0430\u043f\u0443\u0449\u0435\u043d. \u041f\u0440\u0438\u0447\u0438\u043d\u0430 \u043e\u0442\u043a\u043b\u043e\u043d\u0435\u043d\u0438\u044f: {reason}", author="analyst")
|
||||
logger.info(f"Task {task_id}: rejected at analysis, enqueued analyst (job_id={new_job})")
|
||||
else:
|
||||
# Rollback to previous stage
|
||||
@@ -245,8 +263,12 @@ async def handle_comment(data: dict, project_id: str = ""):
|
||||
set_issue_in_progress(work_item_id)
|
||||
notify_stage_change(task_id, current_stage, prev_stage)
|
||||
plane_notify_stage(work_item_id, current_stage, prev_stage)
|
||||
from ..plane_sync import add_comment as _plane_comment
|
||||
_plane_comment(work_item_id, f"\U0001f504 \u041e\u0442\u043a\u0430\u0442: {current_stage} \u2192 {prev_stage}. \u041f\u0440\u0438\u0447\u0438\u043d\u0430: {reason}")
|
||||
from ..plane_sync import add_comment as _plane_comment, STAGE_AUTHORS
|
||||
_plane_comment(
|
||||
work_item_id,
|
||||
f"\U0001f504 \u041e\u0442\u043a\u0430\u0442: {current_stage} \u2192 {prev_stage}. \u041f\u0440\u0438\u0447\u0438\u043d\u0430: {reason}",
|
||||
author=STAGE_AUTHORS.get(prev_stage, "stream"),
|
||||
)
|
||||
logger.info(f"Task {task_id}: rejected, rolled back {current_stage} \u2192 {prev_stage}")
|
||||
return
|
||||
|
||||
@@ -292,7 +314,8 @@ async def handle_comment(data: dict, project_id: str = ""):
|
||||
_pc(
|
||||
work_item_id,
|
||||
"\U0001f6a8 3 \u0440\u0430\u0443\u043d\u0434\u0430 \u0443\u0442\u043e\u0447\u043d\u0435\u043d\u0438\u0439 \u0438\u0441\u0447\u0435\u0440\u043f\u0430\u043d\u044b. Analyst \u043d\u0435 \u043c\u043e\u0436\u0435\u0442 \u0441\u0444\u043e\u0440\u043c\u0438\u0440\u043e\u0432\u0430\u0442\u044c \u0422\u0417. "
|
||||
"\u0422\u0440\u0435\u0431\u0443\u0435\u0442\u0441\u044f \u0431\u043e\u043b\u0435\u0435 \u0434\u0435\u0442\u0430\u043b\u044c\u043d\u043e\u0435 \u043e\u043f\u0438\u0441\u0430\u043d\u0438\u0435 \u0438\u043b\u0438 \u0432\u0441\u0442\u0440\u0435\u0447\u0430."
|
||||
"\u0422\u0440\u0435\u0431\u0443\u0435\u0442\u0441\u044f \u0431\u043e\u043b\u0435\u0435 \u0434\u0435\u0442\u0430\u043b\u044c\u043d\u043e\u0435 \u043e\u043f\u0438\u0441\u0430\u043d\u0438\u0435 \u0438\u043b\u0438 \u0432\u0441\u0442\u0440\u0435\u0447\u0430.",
|
||||
author="analyst",
|
||||
)
|
||||
from ..notifications import send_telegram
|
||||
send_telegram(f"\U0001f6a8 {work_item_id}: 3 \u0440\u0430\u0443\u043d\u0434\u0430 \u0432\u043e\u043f\u0440\u043e\u0441\u043e\u0432 analyst'\u0430 \u0438\u0441\u0447\u0435\u0440\u043f\u0430\u043d\u044b. \u041d\u0443\u0436\u043d\u0430 \u043f\u043e\u043c\u043e\u0449\u044c.")
|
||||
@@ -308,7 +331,7 @@ async def handle_comment(data: dict, project_id: str = ""):
|
||||
)
|
||||
new_job = enqueue_job("analyst", repo, task_desc, task_id=task_id)
|
||||
from ..plane_sync import add_comment as _pc2
|
||||
_pc2(work_item_id, "\U0001f504 Analyst \u043f\u0435\u0440\u0435\u0437\u0430\u043f\u0443\u0449\u0435\u043d \u0441 \u043e\u0442\u0432\u0435\u0442\u0430\u043c\u0438 \u0441\u0442\u0435\u0439\u043a\u0445\u043e\u043b\u0434\u0435\u0440\u0430.")
|
||||
_pc2(work_item_id, "\U0001f504 Analyst \u043f\u0435\u0440\u0435\u0437\u0430\u043f\u0443\u0449\u0435\u043d \u0441 \u043e\u0442\u0432\u0435\u0442\u0430\u043c\u0438 \u0441\u0442\u0435\u0439\u043a\u0445\u043e\u043b\u0434\u0435\u0440\u0430.", author="analyst")
|
||||
logger.info(f"Task {task_id}: stakeholder answered questions, enqueued analyst (job_id={new_job})")
|
||||
return
|
||||
except Exception as e:
|
||||
@@ -318,81 +341,30 @@ async def handle_comment(data: dict, project_id: str = ""):
|
||||
async def _try_advance_stage(
|
||||
task_id: int, current_stage: str, repo: str, work_item_id: str, branch: str
|
||||
):
|
||||
"""Run QG check for current stage and advance if passed."""
|
||||
qg_name = get_qg_for_stage(current_stage)
|
||||
next_stage = get_next_stage(current_stage)
|
||||
"""Thin async wrapper over the unified stage engine (ORCH-4 / M-3).
|
||||
|
||||
if not next_stage:
|
||||
logger.info(f"Task {task_id}: already at terminal stage '{current_stage}'")
|
||||
return
|
||||
The QG dispatch (including the check_review_approved PR-by-branch logic) and
|
||||
the advance/launch logic now live in src/stage_engine.advance_stage(), which
|
||||
is synchronous. We run it off the event loop via asyncio.to_thread so there
|
||||
is exactly one implementation shared with the launcher.
|
||||
|
||||
# Run QG check if one is required
|
||||
if qg_name:
|
||||
qg_func = QG_CHECKS.get(qg_name)
|
||||
if not qg_func:
|
||||
logger.error(f"QG function '{qg_name}' not found in registry")
|
||||
return
|
||||
finished_agent is None on this webhook path (a human :approved: comment, not
|
||||
a finished agent), so the agent-specific rollback branches inside the engine
|
||||
intentionally do not trigger — identical to the old plane behavior, which
|
||||
only ran the QG and either advanced or reported the failure.
|
||||
"""
|
||||
import asyncio
|
||||
from ..stage_engine import advance_stage
|
||||
|
||||
# Determine args based on QG function
|
||||
if qg_name in ("check_analysis_approved", "check_analysis_complete", "check_architecture_done", "check_tests_passed", "check_reviewer_verdict"):
|
||||
# ORCH-2 / S-4: pass branch so artifacts are read from the task worktree.
|
||||
passed, reason = qg_func(repo, work_item_id, branch)
|
||||
elif qg_name in ("check_ci_green", "check_tests_local"):
|
||||
passed, reason = qg_func(repo, branch)
|
||||
elif qg_name == "check_review_approved":
|
||||
# Find PR number by branch via Gitea API
|
||||
import httpx as _httpx
|
||||
from ..config import settings as _s
|
||||
_owner = _s.gitea_owner
|
||||
_url = f"{_s.gitea_url}/api/v1/repos/{_owner}/{repo}/pulls?state=open&limit=50"
|
||||
_headers = {"Authorization": f"token {_s.gitea_token}"}
|
||||
try:
|
||||
_resp = _httpx.get(_url, headers=_headers, timeout=10)
|
||||
_prs = _resp.json()
|
||||
_pr_number = None
|
||||
for _pr in _prs:
|
||||
if _pr.get("head", {}).get("ref") == branch:
|
||||
_pr_number = _pr["number"]
|
||||
break
|
||||
if _pr_number:
|
||||
passed, reason = qg_func(repo, _pr_number)
|
||||
else:
|
||||
# No open PR but review file exists — check file-based
|
||||
import os
|
||||
from ..git_worktree import get_worktree_path as _gwp
|
||||
_wt = _gwp(repo, branch) if os.path.isdir(_gwp(repo, branch)) else os.path.join(_s.repos_dir, repo)
|
||||
_review_path = os.path.join(_wt, f"docs/work-items/{work_item_id}/12-review.md")
|
||||
_review_path2 = os.path.join(_wt, f"docs/work-items/{work_item_id}/09-review.md")
|
||||
if os.path.isfile(_review_path) or os.path.isfile(_review_path2):
|
||||
passed, reason = True, "Review file exists (file-based approval)"
|
||||
else:
|
||||
passed, reason = False, "No open PR found and no review file"
|
||||
except Exception as _e:
|
||||
passed, reason = False, f"Error finding PR: {_e}"
|
||||
else:
|
||||
passed, reason = False, f"Unknown QG: {qg_name}"
|
||||
|
||||
if not passed:
|
||||
notify_qg_failure(task_id, current_stage, qg_name, reason)
|
||||
plane_notify_qg(work_item_id, current_stage, qg_name, reason)
|
||||
return
|
||||
|
||||
# Advance stage
|
||||
update_task_stage(task_id, next_stage)
|
||||
notify_stage_change(task_id, current_stage, next_stage)
|
||||
plane_notify_stage(work_item_id, current_stage, next_stage)
|
||||
|
||||
# Launch agent associated with the current stage's transition
|
||||
agent = get_agent_for_stage(current_stage)
|
||||
if agent:
|
||||
try:
|
||||
task_desc = f"Work item: {work_item_id}\nRepo: {repo}\nBranch: {branch}\nStage: {next_stage}"
|
||||
job_id = enqueue_job(agent, repo, task_desc, task_id=task_id)
|
||||
plane_notify_stage(work_item_id, current_stage, next_stage, agent)
|
||||
logger.info(f"Task {task_id}: enqueued agent '{agent}', job_id={job_id}")
|
||||
except Exception as e:
|
||||
notify_error(task_id, f"Failed to launch agent '{agent}': {e}")
|
||||
logger.error(f"Agent launch failed: {e}")
|
||||
await asyncio.to_thread(
|
||||
advance_stage,
|
||||
task_id,
|
||||
current_stage,
|
||||
repo,
|
||||
work_item_id,
|
||||
branch,
|
||||
None,
|
||||
)
|
||||
|
||||
|
||||
async def _create_gitea_branch(repo: str, branch: str):
|
||||
|
||||
@@ -7,6 +7,7 @@ Covers the audit-2026-06-02 fixes:
|
||||
the YAML frontmatter only (no fragile substring matching).
|
||||
"""
|
||||
import os
|
||||
import signal
|
||||
import tempfile
|
||||
|
||||
import pytest
|
||||
@@ -20,6 +21,7 @@ os.environ["ORCH_PLANE_API_TOKEN"] = "test-token"
|
||||
|
||||
from src.agents.launcher import AgentLauncher
|
||||
from src.qg.checks import check_reviewer_verdict
|
||||
from src.config import settings
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
@@ -138,3 +140,141 @@ class TestCheckReviewerVerdict:
|
||||
passed, reason = check_reviewer_verdict("enduro-trails", "ET-999")
|
||||
assert passed is False
|
||||
assert "not found" in reason.lower()
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# ORCH-7 (M-4): dead code removed
|
||||
# ---------------------------------------------------------------------------
|
||||
class TestDeadCodeRemoved:
|
||||
"""M-4: _auto_merge_pr was never called (merge is the deployer's job) and is
|
||||
removed. _ensure_pr (used by the auto-advance path) must stay."""
|
||||
|
||||
def test_auto_merge_pr_is_gone(self):
|
||||
assert not hasattr(AgentLauncher, "_auto_merge_pr")
|
||||
|
||||
def test_ensure_pr_still_present(self):
|
||||
assert hasattr(AgentLauncher, "_ensure_pr")
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# ORCH-7 (M-2): configurable timeout + per-agent override
|
||||
# ---------------------------------------------------------------------------
|
||||
class TestResolveTimeout:
|
||||
"""M-2: _resolve_timeout honours a per-agent JSON override, else the default."""
|
||||
|
||||
def test_default_when_no_override(self, monkeypatch):
|
||||
monkeypatch.setattr(settings, "agent_timeout_seconds", 1800)
|
||||
monkeypatch.setattr(settings, "agent_timeout_overrides_json", "")
|
||||
assert AgentLauncher._resolve_timeout("developer") == 1800
|
||||
assert AgentLauncher._resolve_timeout(None) == 1800
|
||||
|
||||
def test_override_for_specific_agent(self, monkeypatch):
|
||||
monkeypatch.setattr(settings, "agent_timeout_seconds", 1800)
|
||||
monkeypatch.setattr(
|
||||
settings, "agent_timeout_overrides_json", '{"reviewer": 3600, "architect": 2700}'
|
||||
)
|
||||
assert AgentLauncher._resolve_timeout("reviewer") == 3600
|
||||
assert AgentLauncher._resolve_timeout("architect") == 2700
|
||||
# an agent not in the override map falls back to the default
|
||||
assert AgentLauncher._resolve_timeout("developer") == 1800
|
||||
|
||||
def test_malformed_override_falls_back_to_default(self, monkeypatch):
|
||||
monkeypatch.setattr(settings, "agent_timeout_seconds", 1800)
|
||||
monkeypatch.setattr(settings, "agent_timeout_overrides_json", "{not-json")
|
||||
# must not raise, must return the default
|
||||
assert AgentLauncher._resolve_timeout("reviewer") == 1800
|
||||
|
||||
|
||||
class TestWatchdogGracefulKill:
|
||||
"""M-2: SIGTERM -> grace -> SIGKILL ordering, with graceful-exit short-circuit
|
||||
and ProcessLookupError tolerance. The OS process is fully faked: we record the
|
||||
signals sent and decide liveness from a script, so no real process is touched."""
|
||||
|
||||
def _patch_db(self, monkeypatch):
|
||||
"""Stub get_db so _record_kill does not need a real DB."""
|
||||
class _Conn:
|
||||
def execute(self, *a, **k):
|
||||
return self
|
||||
def commit(self):
|
||||
pass
|
||||
def close(self):
|
||||
pass
|
||||
monkeypatch.setattr("src.agents.launcher.get_db", lambda: _Conn())
|
||||
|
||||
def test_sigterm_then_sigkill_after_grace(self, monkeypatch):
|
||||
"""Process stays alive through the whole grace window -> SIGTERM then SIGKILL."""
|
||||
self._patch_db(monkeypatch)
|
||||
monkeypatch.setattr(settings, "agent_kill_grace_seconds", 1)
|
||||
monkeypatch.setattr("src.agents.launcher.time.sleep", lambda s: None)
|
||||
|
||||
sent = []
|
||||
|
||||
def fake_kill(pid, sig):
|
||||
sent.append(sig)
|
||||
# signal 0 (liveness probe) -> always alive; never raise
|
||||
return None
|
||||
|
||||
monkeypatch.setattr("src.agents.launcher.os.kill", fake_kill)
|
||||
|
||||
launcher = AgentLauncher()
|
||||
launcher._watchdog(pid=4242, run_id=1, timeout=0, agent="developer")
|
||||
|
||||
assert signal.SIGTERM in sent
|
||||
assert signal.SIGKILL in sent
|
||||
# SIGTERM must come before SIGKILL
|
||||
assert sent.index(signal.SIGTERM) < sent.index(signal.SIGKILL)
|
||||
|
||||
def test_graceful_exit_in_grace_skips_sigkill(self, monkeypatch):
|
||||
"""Process dies during the grace window -> SIGKILL is NOT sent."""
|
||||
self._patch_db(monkeypatch)
|
||||
monkeypatch.setattr(settings, "agent_kill_grace_seconds", 5)
|
||||
monkeypatch.setattr("src.agents.launcher.time.sleep", lambda s: None)
|
||||
|
||||
sent = []
|
||||
state = {"alive": True, "probes": 0}
|
||||
|
||||
def fake_kill(pid, sig):
|
||||
if sig == 0:
|
||||
state["probes"] += 1
|
||||
# die on the 2nd liveness probe (within grace)
|
||||
if state["probes"] >= 2:
|
||||
raise ProcessLookupError
|
||||
return None
|
||||
sent.append(sig)
|
||||
return None
|
||||
|
||||
monkeypatch.setattr("src.agents.launcher.os.kill", fake_kill)
|
||||
|
||||
launcher = AgentLauncher()
|
||||
launcher._watchdog(pid=4242, run_id=2, timeout=0, agent="developer")
|
||||
|
||||
assert signal.SIGTERM in sent
|
||||
assert signal.SIGKILL not in sent
|
||||
|
||||
def test_already_dead_before_sigterm(self, monkeypatch):
|
||||
"""Process already gone at SIGTERM -> ProcessLookupError tolerated, no SIGKILL,
|
||||
and _record_kill is NOT called (the monitor's proc.wait owns the exit)."""
|
||||
self._patch_db(monkeypatch)
|
||||
monkeypatch.setattr("src.agents.launcher.time.sleep", lambda s: None)
|
||||
|
||||
sent = []
|
||||
|
||||
def fake_kill(pid, sig):
|
||||
if sig == signal.SIGTERM:
|
||||
raise ProcessLookupError
|
||||
sent.append(sig)
|
||||
return None
|
||||
|
||||
recorded = {"called": False}
|
||||
monkeypatch.setattr(
|
||||
AgentLauncher, "_record_kill",
|
||||
staticmethod(lambda rid: recorded.__setitem__("called", True)),
|
||||
)
|
||||
monkeypatch.setattr("src.agents.launcher.os.kill", fake_kill)
|
||||
|
||||
launcher = AgentLauncher()
|
||||
# must not raise
|
||||
launcher._watchdog(pid=4242, run_id=3, timeout=0, agent="developer")
|
||||
|
||||
assert signal.SIGKILL not in sent
|
||||
assert recorded["called"] is False
|
||||
|
||||
92
tests/test_log_rotation.py
Normal file
92
tests/test_log_rotation.py
Normal file
@@ -0,0 +1,92 @@
|
||||
"""L-2: tests for prune_run_logs (run-log rotation).
|
||||
|
||||
Verifies that old / surplus *.log files are removed while fresh logs, non-.log
|
||||
files, the active log, and subdirectories are left intact. Function is
|
||||
best-effort and must never raise.
|
||||
"""
|
||||
import os
|
||||
import time
|
||||
|
||||
from src.agents.launcher import prune_run_logs
|
||||
|
||||
|
||||
def _touch(path, age_days=0):
|
||||
with open(path, "w") as f:
|
||||
f.write("x")
|
||||
mtime = time.time() - age_days * 86400
|
||||
os.utime(path, (mtime, mtime))
|
||||
return path
|
||||
|
||||
|
||||
def test_old_logs_removed_fresh_kept(tmp_path):
|
||||
runs = tmp_path
|
||||
fresh = _touch(str(runs / "1.log"), age_days=1)
|
||||
old = _touch(str(runs / "2.log"), age_days=40)
|
||||
|
||||
removed = prune_run_logs(str(runs), keep_days=30, keep_max=500)
|
||||
|
||||
assert removed == 1
|
||||
assert os.path.exists(fresh)
|
||||
assert not os.path.exists(old)
|
||||
|
||||
|
||||
def test_non_log_files_untouched(tmp_path):
|
||||
runs = tmp_path
|
||||
old_log = _touch(str(runs / "stale.log"), age_days=99)
|
||||
keep_txt = _touch(str(runs / "notes.txt"), age_days=99)
|
||||
keep_db = _touch(str(runs / "orchestrator.db"), age_days=99)
|
||||
|
||||
prune_run_logs(str(runs), keep_days=30, keep_max=500)
|
||||
|
||||
assert not os.path.exists(old_log)
|
||||
assert os.path.exists(keep_txt)
|
||||
assert os.path.exists(keep_db)
|
||||
|
||||
|
||||
def test_keep_max_retains_newest(tmp_path):
|
||||
runs = tmp_path
|
||||
# 5 logs, all recent (within keep_days), increasing age 0..4 days.
|
||||
paths = []
|
||||
for i in range(5):
|
||||
paths.append(_touch(str(runs / f"{i}.log"), age_days=i))
|
||||
|
||||
removed = prune_run_logs(str(runs), keep_days=365, keep_max=2)
|
||||
|
||||
# Only the 2 newest (age 0, 1) survive.
|
||||
assert removed == 3
|
||||
assert os.path.exists(paths[0])
|
||||
assert os.path.exists(paths[1])
|
||||
for p in paths[2:]:
|
||||
assert not os.path.exists(p)
|
||||
|
||||
|
||||
def test_active_log_never_removed(tmp_path):
|
||||
runs = tmp_path
|
||||
active = _touch(str(runs / "active.log"), age_days=99)
|
||||
other = _touch(str(runs / "other.log"), age_days=99)
|
||||
|
||||
removed = prune_run_logs(
|
||||
str(runs), keep_days=30, keep_max=500, active_paths=[active]
|
||||
)
|
||||
|
||||
assert removed == 1
|
||||
assert os.path.exists(active)
|
||||
assert not os.path.exists(other)
|
||||
|
||||
|
||||
def test_subdirs_untouched(tmp_path):
|
||||
runs = tmp_path
|
||||
sub = runs / "sub.log"
|
||||
sub.mkdir() # a directory that happens to end in .log
|
||||
old_log = _touch(str(runs / "old.log"), age_days=99)
|
||||
|
||||
prune_run_logs(str(runs), keep_days=30, keep_max=500)
|
||||
|
||||
assert sub.is_dir()
|
||||
assert not os.path.exists(old_log)
|
||||
|
||||
|
||||
def test_missing_dir_is_noop(tmp_path):
|
||||
missing = tmp_path / "does-not-exist"
|
||||
# Must not raise.
|
||||
assert prune_run_logs(str(missing)) == 0
|
||||
181
tests/test_m6_sequence.py
Normal file
181
tests/test_m6_sequence.py
Normal file
@@ -0,0 +1,181 @@
|
||||
"""M-6: work_item_id derived from Plane sequence_id (source of truth = Plane).
|
||||
|
||||
Covers:
|
||||
* fetch_issue_sequence_id returns int on a valid Plane response (mocked httpx);
|
||||
* returns None on network error / missing field WITHOUT raising;
|
||||
* handle_work_item_created uses prefix-NNN when seq is available, and falls
|
||||
back to get_next_work_item_id when seq is None (Plane down => autonomy);
|
||||
* find_issue_id no longer hardcodes 'ET-' and matches an arbitrary prefix
|
||||
(e.g. ORCH-005) by sequence_id.
|
||||
"""
|
||||
|
||||
import os
|
||||
import tempfile
|
||||
|
||||
import pytest
|
||||
|
||||
_test_db = os.path.join(tempfile.gettempdir(), "test_orchestrator_m6.db")
|
||||
os.environ["ORCH_DB_PATH"] = _test_db
|
||||
os.environ.setdefault("ORCH_PLANE_WEBHOOK_SECRET", "")
|
||||
os.environ.setdefault("ORCH_GITEA_WEBHOOK_SECRET", "")
|
||||
os.environ.setdefault("ORCH_GITEA_TOKEN", "test-token")
|
||||
os.environ.setdefault("ORCH_PLANE_API_TOKEN", "test-token")
|
||||
|
||||
from unittest.mock import patch, AsyncMock, MagicMock # noqa: E402
|
||||
|
||||
from fastapi.testclient import TestClient # noqa: E402
|
||||
|
||||
from src.main import app # noqa: E402
|
||||
from src.db import init_db, get_db # noqa: E402
|
||||
from src import projects as P # noqa: E402
|
||||
from src.projects import reload_projects # noqa: E402
|
||||
import src.plane_sync as plane_sync # noqa: E402
|
||||
|
||||
ORCH_PLANE_ID = "8da6aa25-a60e-44d6-a1e2-d8ae59aa7d6a"
|
||||
ENDURO_PLANE_ID = "7a79f0a9-5278-49cd-9007-9a338f238f9c"
|
||||
|
||||
client = TestClient(app)
|
||||
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def setup(monkeypatch):
|
||||
monkeypatch.setattr(P.settings, "db_path", _test_db)
|
||||
import src.db as _db
|
||||
monkeypatch.setattr(_db.settings, "db_path", _test_db)
|
||||
if os.path.exists(_test_db):
|
||||
os.unlink(_test_db)
|
||||
init_db()
|
||||
|
||||
monkeypatch.setattr("src.webhooks.plane.verify_plane_signature", lambda body, sig: True)
|
||||
|
||||
registry_json = (
|
||||
f'[{{"plane_project_id": "{ENDURO_PLANE_ID}", "repo": "enduro-trails",'
|
||||
f' "work_item_prefix": "ET", "name": "enduro-trails"}},'
|
||||
f' {{"plane_project_id": "{ORCH_PLANE_ID}", "repo": "orchestrator",'
|
||||
f' "work_item_prefix": "ORCH", "name": "orchestrator"}}]'
|
||||
)
|
||||
monkeypatch.setattr(P.settings, "projects_json", registry_json)
|
||||
reload_projects()
|
||||
|
||||
yield
|
||||
|
||||
reload_projects()
|
||||
if os.path.exists(_test_db):
|
||||
os.unlink(_test_db)
|
||||
|
||||
|
||||
def _mock_resp(json_body, status=200):
|
||||
m = MagicMock()
|
||||
m.json.return_value = json_body
|
||||
m.raise_for_status.return_value = None
|
||||
if status >= 400:
|
||||
def _raise():
|
||||
raise RuntimeError(f"HTTP {status}")
|
||||
m.raise_for_status.side_effect = _raise
|
||||
return m
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# fetch_issue_sequence_id
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def test_fetch_sequence_id_returns_int():
|
||||
with patch.object(plane_sync.httpx, "get", return_value=_mock_resp({"sequence_id": 42})):
|
||||
seq = plane_sync.fetch_issue_sequence_id("issue-uuid", "proj-uuid")
|
||||
assert seq == 42
|
||||
assert isinstance(seq, int)
|
||||
|
||||
|
||||
def test_fetch_sequence_id_network_error_returns_none():
|
||||
with patch.object(plane_sync.httpx, "get", side_effect=RuntimeError("connection refused")):
|
||||
seq = plane_sync.fetch_issue_sequence_id("issue-uuid", "proj-uuid")
|
||||
assert seq is None # must not raise
|
||||
|
||||
|
||||
def test_fetch_sequence_id_missing_field_returns_none():
|
||||
with patch.object(plane_sync.httpx, "get", return_value=_mock_resp({"error": "not found"})):
|
||||
seq = plane_sync.fetch_issue_sequence_id("missing-uuid", "proj-uuid")
|
||||
assert seq is None
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# handle_work_item_created: seq available -> prefix-NNN
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def _post(plane_id, plane_project_id=ORCH_PLANE_ID, name="A valid work item title"):
|
||||
return client.post(
|
||||
"/webhook/plane",
|
||||
json={
|
||||
"event": "work_item.created",
|
||||
"data": {
|
||||
"id": plane_id,
|
||||
"name": name,
|
||||
"description_stripped": "This is a sufficiently long description.",
|
||||
"project": plane_project_id,
|
||||
},
|
||||
},
|
||||
)
|
||||
|
||||
|
||||
@patch("src.webhooks.plane.launcher")
|
||||
@patch("src.webhooks.plane._create_initial_docs", new_callable=AsyncMock)
|
||||
@patch("src.webhooks.plane._create_gitea_branch", new_callable=AsyncMock)
|
||||
@patch("src.plane_sync.fetch_issue_sequence_id", return_value=7)
|
||||
def test_created_uses_plane_sequence_id(mock_fetch, mock_branch, mock_docs, mock_launcher):
|
||||
mock_launcher.launch.return_value = 1
|
||||
resp = _post("seq-issue")
|
||||
assert resp.status_code == 200
|
||||
conn = get_db()
|
||||
task = conn.execute("SELECT work_item_id FROM tasks WHERE plane_id='seq-issue'").fetchone()
|
||||
conn.close()
|
||||
assert task is not None
|
||||
assert task["work_item_id"] == "ORCH-007"
|
||||
mock_fetch.assert_called_once()
|
||||
|
||||
|
||||
@patch("src.webhooks.plane.launcher")
|
||||
@patch("src.webhooks.plane._create_initial_docs", new_callable=AsyncMock)
|
||||
@patch("src.webhooks.plane._create_gitea_branch", new_callable=AsyncMock)
|
||||
@patch("src.plane_sync.fetch_issue_sequence_id", return_value=None)
|
||||
@patch("src.webhooks.plane.get_next_work_item_id", return_value="ORCH-099")
|
||||
def test_created_falls_back_to_db_when_plane_down(
|
||||
mock_next, mock_fetch, mock_branch, mock_docs, mock_launcher
|
||||
):
|
||||
"""Plane unavailable (seq=None) => fall back to DB increment; task still created."""
|
||||
mock_launcher.launch.return_value = 1
|
||||
resp = _post("fallback-issue")
|
||||
assert resp.status_code == 200
|
||||
conn = get_db()
|
||||
task = conn.execute("SELECT work_item_id FROM tasks WHERE plane_id='fallback-issue'").fetchone()
|
||||
conn.close()
|
||||
assert task is not None # autonomy: Plane down does not block creation
|
||||
assert task["work_item_id"] == "ORCH-099"
|
||||
mock_next.assert_called_once()
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# find_issue_id: no hardcoded ET- prefix, matches arbitrary prefix by seq
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def test_find_issue_id_matches_arbitrary_prefix_by_sequence():
|
||||
"""ORCH-005 must resolve via the issue whose sequence_id == 5 (no ET- assumption)."""
|
||||
issues = {"results": [
|
||||
{"id": "uuid-a", "sequence_id": 3, "name": "something"},
|
||||
{"id": "uuid-b", "sequence_id": 5, "name": "ORCH-005: target"},
|
||||
{"id": "uuid-c", "sequence_id": 9, "name": "other"},
|
||||
]}
|
||||
# No DB row for this work_item_id => goes to the Plane API search branch.
|
||||
with patch.object(plane_sync.httpx, "get", return_value=_mock_resp(issues)):
|
||||
found = plane_sync.find_issue_id("ORCH-005", project_id="proj-uuid")
|
||||
assert found == "uuid-b"
|
||||
|
||||
|
||||
def test_find_issue_id_matches_et_prefix_too():
|
||||
"""Backward compat: ET-002 still resolves by sequence_id == 2."""
|
||||
issues = {"results": [
|
||||
{"id": "uuid-x", "sequence_id": 2, "name": "ET item"},
|
||||
{"id": "uuid-y", "sequence_id": 7, "name": "other"},
|
||||
]}
|
||||
with patch.object(plane_sync.httpx, "get", return_value=_mock_resp(issues)):
|
||||
found = plane_sync.find_issue_id("ET-002", project_id="proj-uuid")
|
||||
assert found == "uuid-x"
|
||||
99
tests/test_plane_author.py
Normal file
99
tests/test_plane_author.py
Normal file
@@ -0,0 +1,99 @@
|
||||
"""Tests for per-agent Plane comment authorship (feat: per-agent bot author).
|
||||
|
||||
Covers:
|
||||
* _headers_for: role -> bot token; None/unknown/empty token -> shared fallback.
|
||||
* add_comment: author is propagated into the POST headers; no author keeps
|
||||
backward-compatible behaviour (shared orchestrator token).
|
||||
|
||||
GET/PATCH calls are intentionally NOT covered here: they stay on the shared
|
||||
token by design and are unchanged by this feature.
|
||||
"""
|
||||
|
||||
import os
|
||||
|
||||
# Set env defaults before importing app modules (same convention as the other
|
||||
# suites) so config/settings load cleanly without a real .env.
|
||||
os.environ.setdefault("ORCH_PLANE_API_TOKEN", "shared-token")
|
||||
os.environ.setdefault("ORCH_GITEA_TOKEN", "test-token")
|
||||
|
||||
from unittest.mock import patch, MagicMock # noqa: E402
|
||||
|
||||
from src import plane_sync # noqa: E402
|
||||
|
||||
|
||||
# --------------------------------------------------------------------------- #
|
||||
# _headers_for
|
||||
# --------------------------------------------------------------------------- #
|
||||
def test_headers_for_known_role_uses_bot_token():
|
||||
"""A known role with a configured token -> that bot's X-API-Key."""
|
||||
with patch.dict(plane_sync.PLANE_BOT_TOKENS, {"analyst": "analyst-tok"}, clear=False):
|
||||
assert plane_sync._headers_for("analyst") == {"X-API-Key": "analyst-tok"}
|
||||
|
||||
|
||||
def test_headers_for_none_falls_back_to_shared():
|
||||
"""author=None -> shared orchestrator headers."""
|
||||
assert plane_sync._headers_for(None) is plane_sync.PLANE_HEADERS
|
||||
|
||||
|
||||
def test_headers_for_unknown_role_falls_back_to_shared():
|
||||
"""Unknown role -> shared orchestrator headers."""
|
||||
assert plane_sync._headers_for("nope") is plane_sync.PLANE_HEADERS
|
||||
|
||||
|
||||
def test_headers_for_empty_token_falls_back_to_shared():
|
||||
"""Known role but empty/unconfigured token -> shared orchestrator headers."""
|
||||
with patch.dict(plane_sync.PLANE_BOT_TOKENS, {"tester": ""}, clear=False):
|
||||
assert plane_sync._headers_for("tester") is plane_sync.PLANE_HEADERS
|
||||
|
||||
|
||||
def test_headers_for_empty_string_author_falls_back_to_shared():
|
||||
"""author='' -> shared orchestrator headers."""
|
||||
assert plane_sync._headers_for("") is plane_sync.PLANE_HEADERS
|
||||
|
||||
|
||||
# --------------------------------------------------------------------------- #
|
||||
# add_comment
|
||||
# --------------------------------------------------------------------------- #
|
||||
def _mock_post_ok():
|
||||
resp = MagicMock()
|
||||
resp.raise_for_status.return_value = None
|
||||
return resp
|
||||
|
||||
|
||||
def test_add_comment_with_author_posts_with_bot_headers():
|
||||
"""add_comment(author='developer') -> httpx.post called with the developer
|
||||
bot's X-API-Key header."""
|
||||
with patch.object(plane_sync, "find_issue_id", return_value="issue-uuid"), \
|
||||
patch.object(plane_sync, "_resolve_project_id", return_value="proj-uuid"), \
|
||||
patch.dict(plane_sync.PLANE_BOT_TOKENS, {"developer": "dev-tok"}, clear=False), \
|
||||
patch.object(plane_sync.httpx, "post", return_value=_mock_post_ok()) as mock_post:
|
||||
plane_sync.add_comment("ET-001", "hello", author="developer")
|
||||
|
||||
assert mock_post.called
|
||||
_, kwargs = mock_post.call_args
|
||||
assert kwargs["headers"] == {"X-API-Key": "dev-tok"}
|
||||
|
||||
|
||||
def test_add_comment_without_author_uses_shared_token():
|
||||
"""add_comment without author -> shared orchestrator headers (backward
|
||||
compatible)."""
|
||||
with patch.object(plane_sync, "find_issue_id", return_value="issue-uuid"), \
|
||||
patch.object(plane_sync, "_resolve_project_id", return_value="proj-uuid"), \
|
||||
patch.object(plane_sync.httpx, "post", return_value=_mock_post_ok()) as mock_post:
|
||||
plane_sync.add_comment("ET-001", "hello")
|
||||
|
||||
assert mock_post.called
|
||||
_, kwargs = mock_post.call_args
|
||||
assert kwargs["headers"] is plane_sync.PLANE_HEADERS
|
||||
|
||||
|
||||
def test_add_comment_unknown_author_uses_shared_token():
|
||||
"""add_comment with an unknown role -> shared orchestrator headers."""
|
||||
with patch.object(plane_sync, "find_issue_id", return_value="issue-uuid"), \
|
||||
patch.object(plane_sync, "_resolve_project_id", return_value="proj-uuid"), \
|
||||
patch.object(plane_sync.httpx, "post", return_value=_mock_post_ok()) as mock_post:
|
||||
plane_sync.add_comment("ET-001", "hello", author="ghost")
|
||||
|
||||
assert mock_post.called
|
||||
_, kwargs = mock_post.call_args
|
||||
assert kwargs["headers"] is plane_sync.PLANE_HEADERS
|
||||
395
tests/test_stage_engine.py
Normal file
395
tests/test_stage_engine.py
Normal file
@@ -0,0 +1,395 @@
|
||||
"""ORCH-4 / M-3: tests for the unified stage engine (src/stage_engine.advance_stage).
|
||||
|
||||
These verify the MERGED behavior of what used to be two diverged
|
||||
_try_advance_stage implementations (launcher sync + plane async):
|
||||
|
||||
* happy-path advance for every stage launches the CORRECT agent
|
||||
(the ORCH-4 fix: agent = get_agent_for_stage(current_stage), NOT next_stage);
|
||||
* a QG failure does not advance;
|
||||
* reviewer REQUEST_CHANGES -> rollback to development + enqueue developer;
|
||||
* developer retries > 3 -> telegram alert, no further enqueue;
|
||||
* tester FAIL -> rollback to development + enqueue developer;
|
||||
* architect conflict (10-conflict.md) -> rollback to analysis + enqueue analyst;
|
||||
* launcher AND plane both delegate to the engine.
|
||||
|
||||
Network/Plane/Telegram side effects are mocked at the src.stage_engine level so
|
||||
the engine runs against a real isolated sqlite DB.
|
||||
"""
|
||||
|
||||
import os
|
||||
import tempfile
|
||||
|
||||
import pytest
|
||||
|
||||
# Isolated test DB (same convention as the other suites).
|
||||
_test_db = os.path.join(tempfile.gettempdir(), "test_orchestrator_stage_engine.db")
|
||||
os.environ["ORCH_DB_PATH"] = _test_db
|
||||
os.environ["ORCH_REPOS_DIR"] = tempfile.gettempdir()
|
||||
os.environ.setdefault("ORCH_GITEA_TOKEN", "test-token")
|
||||
os.environ.setdefault("ORCH_PLANE_API_TOKEN", "test-token")
|
||||
|
||||
from unittest.mock import MagicMock, patch # noqa: E402
|
||||
|
||||
import src.db as _db # noqa: E402
|
||||
from src.db import init_db, get_db # noqa: E402
|
||||
from src import stage_engine # noqa: E402
|
||||
from src.stage_engine import advance_stage # noqa: E402
|
||||
from src.stages import get_agent_for_stage # noqa: E402
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Fixtures
|
||||
# ---------------------------------------------------------------------------
|
||||
@pytest.fixture(autouse=True)
|
||||
def fresh_db(monkeypatch):
|
||||
"""Fresh isolated DB per test."""
|
||||
monkeypatch.setattr(_db.settings, "db_path", _test_db)
|
||||
if os.path.exists(_test_db):
|
||||
os.unlink(_test_db)
|
||||
init_db()
|
||||
yield
|
||||
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def silence_side_effects(monkeypatch):
|
||||
"""Mock all Plane/Telegram/notification side effects in the engine.
|
||||
|
||||
Everything imported into src.stage_engine that touches the network or sends
|
||||
a message becomes a no-op MagicMock so tests are deterministic and offline.
|
||||
"""
|
||||
for name in (
|
||||
"notify_stage_change",
|
||||
"notify_qg_failure",
|
||||
"notify_approve_requested",
|
||||
"send_telegram",
|
||||
"plane_notify_stage",
|
||||
"plane_notify_qg",
|
||||
"plane_add_comment",
|
||||
"set_issue_in_review",
|
||||
"set_issue_needs_input",
|
||||
"set_issue_in_progress",
|
||||
"set_issue_blocked",
|
||||
):
|
||||
monkeypatch.setattr(stage_engine, name, MagicMock())
|
||||
|
||||
|
||||
def _make_task(stage, repo="enduro-trails", branch="feature/ET-001-x", wi="ET-001"):
|
||||
conn = get_db()
|
||||
cur = conn.execute(
|
||||
"INSERT INTO tasks (plane_id, work_item_id, repo, branch, stage) "
|
||||
"VALUES (?, ?, ?, ?, ?)",
|
||||
(f"plane-{wi}", wi, repo, branch, stage),
|
||||
)
|
||||
task_id = cur.lastrowid
|
||||
conn.commit()
|
||||
conn.close()
|
||||
return task_id
|
||||
|
||||
|
||||
def _stage(task_id):
|
||||
conn = get_db()
|
||||
row = conn.execute("SELECT stage FROM tasks WHERE id=?", (task_id,)).fetchone()
|
||||
conn.close()
|
||||
return row[0]
|
||||
|
||||
|
||||
def _jobs():
|
||||
conn = get_db()
|
||||
rows = conn.execute("SELECT agent, repo, task_id FROM jobs ORDER BY id").fetchall()
|
||||
conn.close()
|
||||
return [dict(r) for r in rows]
|
||||
|
||||
|
||||
def _add_developer_runs(task_id, n):
|
||||
conn = get_db()
|
||||
for _ in range(n):
|
||||
conn.execute(
|
||||
"INSERT INTO agent_runs (task_id, agent) VALUES (?, 'developer')",
|
||||
(task_id,),
|
||||
)
|
||||
conn.commit()
|
||||
conn.close()
|
||||
|
||||
|
||||
def _pass(*a, **k):
|
||||
return (True, "ok")
|
||||
|
||||
|
||||
def _fail(reason):
|
||||
def _f(*a, **k):
|
||||
return (False, reason)
|
||||
return _f
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Happy path: each stage advances and launches the CORRECT agent (ORCH-4 fix)
|
||||
# ---------------------------------------------------------------------------
|
||||
class TestHappyPathAgentSelection:
|
||||
"""The fixed agent-selection: when advancing FROM current_stage, the engine
|
||||
must enqueue get_agent_for_stage(current_stage), NOT next_stage.
|
||||
"""
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"current_stage,expected_next,expected_agent",
|
||||
[
|
||||
("architecture", "development", "developer"),
|
||||
("development", "review", "reviewer"),
|
||||
("review", "testing", "tester"),
|
||||
("testing", "deploy", "deployer"),
|
||||
],
|
||||
)
|
||||
def test_advance_launches_current_stage_agent(
|
||||
self, monkeypatch, current_stage, expected_next, expected_agent
|
||||
):
|
||||
# All QG checks pass for this happy-path suite.
|
||||
monkeypatch.setattr(
|
||||
stage_engine, "QG_CHECKS",
|
||||
{k: _pass for k in stage_engine.QG_CHECKS},
|
||||
)
|
||||
task_id = _make_task(current_stage)
|
||||
|
||||
res = advance_stage(
|
||||
task_id, current_stage, "enduro-trails", "ET-001",
|
||||
"feature/ET-001-x", finished_agent=None,
|
||||
)
|
||||
|
||||
assert res.advanced is True
|
||||
assert res.to_stage == expected_next
|
||||
assert _stage(task_id) == expected_next
|
||||
# The ORCH-4 fix: correct agent == get_agent_for_stage(current_stage).
|
||||
assert expected_agent == get_agent_for_stage(current_stage)
|
||||
assert res.enqueued_agent == expected_agent
|
||||
jobs = _jobs()
|
||||
assert len(jobs) == 1
|
||||
assert jobs[0]["agent"] == expected_agent
|
||||
|
||||
def test_deploy_to_done_no_agent(self, monkeypatch):
|
||||
"""deploy -> done advances but launches no agent (terminal-ish)."""
|
||||
monkeypatch.setattr(
|
||||
stage_engine, "QG_CHECKS",
|
||||
{k: _pass for k in stage_engine.QG_CHECKS},
|
||||
)
|
||||
task_id = _make_task("deploy")
|
||||
res = advance_stage(task_id, "deploy", "enduro-trails", "ET-001",
|
||||
"feature/ET-001-x", finished_agent=None)
|
||||
assert res.advanced is True
|
||||
assert _stage(task_id) == "done"
|
||||
assert res.enqueued_agent is None
|
||||
assert _jobs() == []
|
||||
|
||||
def test_done_is_terminal(self):
|
||||
task_id = _make_task("done")
|
||||
res = advance_stage(task_id, "done", "enduro-trails", "ET-001",
|
||||
"feature/ET-001-x", finished_agent=None)
|
||||
assert res.advanced is False
|
||||
assert _stage(task_id) == "done"
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# QG failure: do not advance
|
||||
# ---------------------------------------------------------------------------
|
||||
class TestQgFailureDoesNotAdvance:
|
||||
def test_qg_fail_keeps_stage(self, monkeypatch):
|
||||
monkeypatch.setattr(
|
||||
stage_engine, "QG_CHECKS",
|
||||
{**stage_engine.QG_CHECKS, "check_architecture_done": _fail("not done")},
|
||||
)
|
||||
task_id = _make_task("architecture")
|
||||
res = advance_stage(task_id, "architecture", "enduro-trails", "ET-001",
|
||||
"feature/ET-001-x", finished_agent="architect")
|
||||
assert res.advanced is False
|
||||
assert res.qg_passed is False
|
||||
assert _stage(task_id) == "architecture"
|
||||
assert _jobs() == []
|
||||
|
||||
def test_webhook_path_emits_qg_failure_notification(self, monkeypatch):
|
||||
"""finished_agent=None -> generic QG-failure notification fires (plane parity)."""
|
||||
monkeypatch.setattr(
|
||||
stage_engine, "QG_CHECKS",
|
||||
{**stage_engine.QG_CHECKS, "check_tests_local": _fail("ci red")},
|
||||
)
|
||||
task_id = _make_task("development")
|
||||
advance_stage(task_id, "development", "enduro-trails", "ET-001",
|
||||
"feature/ET-001-x", finished_agent=None)
|
||||
assert stage_engine.notify_qg_failure.called
|
||||
assert stage_engine.plane_notify_qg.called
|
||||
|
||||
def test_launcher_path_no_generic_qg_notification(self, monkeypatch):
|
||||
"""finished_agent set -> NO generic QG notification (launcher parity)."""
|
||||
monkeypatch.setattr(
|
||||
stage_engine, "QG_CHECKS",
|
||||
{**stage_engine.QG_CHECKS, "check_architecture_done": _fail("not done")},
|
||||
)
|
||||
task_id = _make_task("architecture")
|
||||
advance_stage(task_id, "architecture", "enduro-trails", "ET-001",
|
||||
"feature/ET-001-x", finished_agent="architect")
|
||||
assert not stage_engine.notify_qg_failure.called
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Reviewer REQUEST_CHANGES -> rollback to development + enqueue developer
|
||||
# ---------------------------------------------------------------------------
|
||||
class TestReviewerRequestChanges:
|
||||
def test_rollback_and_enqueue_developer(self, monkeypatch):
|
||||
monkeypatch.setattr(
|
||||
stage_engine, "QG_CHECKS",
|
||||
{**stage_engine.QG_CHECKS,
|
||||
"check_reviewer_verdict": _fail("verdict: REQUEST_CHANGES")},
|
||||
)
|
||||
task_id = _make_task("review")
|
||||
res = advance_stage(task_id, "review", "enduro-trails", "ET-001",
|
||||
"feature/ET-001-x", finished_agent="reviewer")
|
||||
assert res.advanced is False
|
||||
assert res.rolled_back_to == "development"
|
||||
assert _stage(task_id) == "development"
|
||||
jobs = _jobs()
|
||||
assert len(jobs) == 1
|
||||
assert jobs[0]["agent"] == "developer"
|
||||
|
||||
def test_retry_over_3_alerts_no_enqueue(self, monkeypatch):
|
||||
monkeypatch.setattr(
|
||||
stage_engine, "QG_CHECKS",
|
||||
{**stage_engine.QG_CHECKS,
|
||||
"check_reviewer_verdict": _fail("verdict: REQUEST_CHANGES")},
|
||||
)
|
||||
task_id = _make_task("review")
|
||||
_add_developer_runs(task_id, 3) # already at the max
|
||||
res = advance_stage(task_id, "review", "enduro-trails", "ET-001",
|
||||
"feature/ET-001-x", finished_agent="reviewer")
|
||||
assert res.rolled_back_to == "development"
|
||||
assert res.alerted is True
|
||||
assert stage_engine.send_telegram.called
|
||||
# No new developer job enqueued past the retry cap.
|
||||
assert _jobs() == []
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Tester FAIL -> rollback to development + enqueue developer
|
||||
# ---------------------------------------------------------------------------
|
||||
class TestTesterFail:
|
||||
def test_rollback_and_enqueue_developer(self, monkeypatch):
|
||||
monkeypatch.setattr(
|
||||
stage_engine, "QG_CHECKS",
|
||||
{**stage_engine.QG_CHECKS, "check_tests_passed": _fail("2 tests failed")},
|
||||
)
|
||||
task_id = _make_task("testing")
|
||||
res = advance_stage(task_id, "testing", "enduro-trails", "ET-001",
|
||||
"feature/ET-001-x", finished_agent="tester")
|
||||
assert res.advanced is False
|
||||
assert res.rolled_back_to == "development"
|
||||
assert _stage(task_id) == "development"
|
||||
jobs = _jobs()
|
||||
assert len(jobs) == 1
|
||||
assert jobs[0]["agent"] == "developer"
|
||||
|
||||
def test_retry_over_3_blocks_and_alerts(self, monkeypatch):
|
||||
monkeypatch.setattr(
|
||||
stage_engine, "QG_CHECKS",
|
||||
{**stage_engine.QG_CHECKS, "check_tests_passed": _fail("still failing")},
|
||||
)
|
||||
task_id = _make_task("testing")
|
||||
_add_developer_runs(task_id, 3)
|
||||
res = advance_stage(task_id, "testing", "enduro-trails", "ET-001",
|
||||
"feature/ET-001-x", finished_agent="tester")
|
||||
assert res.rolled_back_to == "development"
|
||||
assert res.alerted is True
|
||||
assert stage_engine.set_issue_blocked.called
|
||||
assert _jobs() == []
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Architect conflict -> rollback to analysis + enqueue analyst
|
||||
# ---------------------------------------------------------------------------
|
||||
class TestArchitectConflict:
|
||||
def test_conflict_rolls_back_to_analysis(self, monkeypatch, tmp_path):
|
||||
# 10-conflict.md must exist in the worktree path the engine inspects.
|
||||
wt = tmp_path / "wt"
|
||||
conflict_dir = wt / "docs" / "work-items" / "ET-001"
|
||||
conflict_dir.mkdir(parents=True)
|
||||
(conflict_dir / "10-conflict.md").write_text("conflict with TRZ")
|
||||
|
||||
monkeypatch.setattr(stage_engine, "get_worktree_path", lambda repo, branch: str(wt))
|
||||
monkeypatch.setattr(
|
||||
stage_engine, "QG_CHECKS",
|
||||
{**stage_engine.QG_CHECKS, "check_architecture_done": _fail("conflict")},
|
||||
)
|
||||
task_id = _make_task("architecture")
|
||||
res = advance_stage(task_id, "architecture", "enduro-trails", "ET-001",
|
||||
"feature/ET-001-x", finished_agent="architect")
|
||||
assert res.advanced is False
|
||||
assert res.rolled_back_to == "analysis"
|
||||
assert _stage(task_id) == "analysis"
|
||||
jobs = _jobs()
|
||||
assert len(jobs) == 1
|
||||
assert jobs[0]["agent"] == "analyst"
|
||||
|
||||
def test_no_conflict_file_no_rollback(self, monkeypatch, tmp_path):
|
||||
wt = tmp_path / "wt"
|
||||
(wt / "docs").mkdir(parents=True)
|
||||
monkeypatch.setattr(stage_engine, "get_worktree_path", lambda repo, branch: str(wt))
|
||||
monkeypatch.setattr(
|
||||
stage_engine, "QG_CHECKS",
|
||||
{**stage_engine.QG_CHECKS, "check_architecture_done": _fail("incomplete")},
|
||||
)
|
||||
task_id = _make_task("architecture")
|
||||
res = advance_stage(task_id, "architecture", "enduro-trails", "ET-001",
|
||||
"feature/ET-001-x", finished_agent="architect")
|
||||
assert res.advanced is False
|
||||
assert res.rolled_back_to is None
|
||||
assert _stage(task_id) == "architecture"
|
||||
assert _jobs() == []
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Analyst approved-flow (analysis gate): never auto-advances
|
||||
# ---------------------------------------------------------------------------
|
||||
class TestAnalysisApprovedFlow:
|
||||
def test_artifacts_ready_requests_approval_no_advance(self, monkeypatch):
|
||||
monkeypatch.setattr(
|
||||
stage_engine, "QG_CHECKS",
|
||||
{**stage_engine.QG_CHECKS, "check_analysis_complete": _pass},
|
||||
)
|
||||
task_id = _make_task("analysis")
|
||||
res = advance_stage(task_id, "analysis", "enduro-trails", "ET-001",
|
||||
"feature/ET-001-x", finished_agent="analyst")
|
||||
assert res.advanced is False
|
||||
assert _stage(task_id) == "analysis"
|
||||
assert stage_engine.set_issue_in_review.called
|
||||
assert stage_engine.notify_approve_requested.called
|
||||
assert _jobs() == []
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# launcher + plane both delegate to the engine
|
||||
# ---------------------------------------------------------------------------
|
||||
class TestDelegation:
|
||||
def test_launcher_calls_engine(self):
|
||||
from src.agents.launcher import AgentLauncher
|
||||
task_id = _make_task("development", branch="feature/ET-777-deleg")
|
||||
with patch("src.stage_engine.advance_stage") as m:
|
||||
AgentLauncher()._try_advance_stage(
|
||||
run_id=1, agent="developer", repo="enduro-trails",
|
||||
branch="feature/ET-777-deleg",
|
||||
)
|
||||
m.assert_called_once()
|
||||
kwargs = m.call_args.kwargs
|
||||
assert kwargs["task_id"] == task_id
|
||||
assert kwargs["current_stage"] == "development"
|
||||
assert kwargs["finished_agent"] == "developer"
|
||||
|
||||
def test_plane_calls_engine(self):
|
||||
import asyncio
|
||||
from src.webhooks import plane as plane_mod
|
||||
with patch("src.stage_engine.advance_stage") as m:
|
||||
asyncio.run(
|
||||
plane_mod._try_advance_stage(
|
||||
task_id=5, current_stage="analysis", repo="enduro-trails",
|
||||
work_item_id="ET-001", branch="feature/ET-001-x",
|
||||
)
|
||||
)
|
||||
m.assert_called_once()
|
||||
# plane passes positional args; finished_agent (last positional) is None.
|
||||
args = m.call_args.args
|
||||
assert args[0] == 5
|
||||
assert args[1] == "analysis"
|
||||
assert args[-1] is None
|
||||
277
tests/test_webhook_dedup.py
Normal file
277
tests/test_webhook_dedup.py
Normal file
@@ -0,0 +1,277 @@
|
||||
"""ORCH-5 (M-7): webhook delivery de-duplication tests.
|
||||
|
||||
A retried/replayed webhook delivery must be processed exactly once. We mock
|
||||
enqueue_job (imported into the gitea/plane module namespaces) and assert its
|
||||
call_count does not grow on a repeat. HMAC is bypassed here by forcing the
|
||||
webhook secrets empty (the 9 pre-existing 401 webhook tests are a separate
|
||||
baseline and are NOT touched). A dedicated test keeps the 401-on-bad-signature
|
||||
guarantee by re-enabling the secret.
|
||||
"""
|
||||
|
||||
import os
|
||||
import tempfile
|
||||
from unittest.mock import patch, AsyncMock
|
||||
|
||||
import pytest
|
||||
|
||||
# Override DB path + project registry BEFORE importing app (same pattern as
|
||||
# tests/test_webhooks.py).
|
||||
_test_db = os.path.join(tempfile.gettempdir(), "test_orchestrator_dedup.db")
|
||||
os.environ["ORCH_DB_PATH"] = _test_db
|
||||
os.environ["ORCH_REPOS_DIR"] = tempfile.gettempdir()
|
||||
os.environ["ORCH_GITEA_TOKEN"] = "test-token"
|
||||
os.environ["ORCH_PLANE_API_TOKEN"] = "test-token"
|
||||
os.environ["ORCH_GITEA_OWNER"] = "admin"
|
||||
os.environ["ORCH_DEFAULT_REPO"] = "enduro-trails"
|
||||
os.environ["ORCH_PROJECTS_JSON"] = (
|
||||
'[{"plane_project_id": "proj-1", "repo": "enduro-trails", '
|
||||
'"work_item_prefix": "ET", "name": "enduro-trails"}]'
|
||||
)
|
||||
|
||||
from fastapi.testclient import TestClient # noqa: E402
|
||||
from src.main import app # noqa: E402
|
||||
from src.db import init_db, get_db # noqa: E402
|
||||
from src import db as db_module # noqa: E402
|
||||
from src.webhooks import gitea as gitea_mod # noqa: E402
|
||||
from src.webhooks import plane as plane_mod # noqa: E402
|
||||
from src import projects as projects_mod # noqa: E402
|
||||
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def setup_db(monkeypatch):
|
||||
# settings is a process-wide singleton; another test module may have fixed
|
||||
# settings.db_path to its own file at import time. get_db() reads it live, so
|
||||
# pin it to OUR db for the duration of each test here.
|
||||
monkeypatch.setattr(db_module.settings, "db_path", _test_db, raising=False)
|
||||
if os.path.exists(_test_db):
|
||||
os.unlink(_test_db)
|
||||
init_db()
|
||||
yield
|
||||
if os.path.exists(_test_db):
|
||||
os.unlink(_test_db)
|
||||
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def proj_registry():
|
||||
"""Pin the shared project registry to proj-1/enduro-trails.
|
||||
|
||||
The registry (projects.PROJECTS / _BY_PLANE_ID) is a process-wide singleton
|
||||
built at import; test_projects.py rebuilds it via reload_projects(), which can
|
||||
leave it on the built-in default where proj-1 is unknown -> ORCH-6 would
|
||||
ignore our fixtures. Force ours for each test, then rebuild after.
|
||||
"""
|
||||
os.environ["ORCH_PROJECTS_JSON"] = (
|
||||
'[{"plane_project_id": "proj-1", "repo": "enduro-trails", '
|
||||
'"work_item_prefix": "ET", "name": "enduro-trails"}]'
|
||||
)
|
||||
projects_mod.settings.projects_json = os.environ["ORCH_PROJECTS_JSON"]
|
||||
projects_mod.reload_projects()
|
||||
yield
|
||||
projects_mod.reload_projects()
|
||||
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def no_hmac(monkeypatch):
|
||||
"""Bypass HMAC so dedup behavior (not signing) is under test.
|
||||
|
||||
settings is shared, so override the secret on the module-level settings that
|
||||
each verify_* function reads.
|
||||
"""
|
||||
monkeypatch.setattr(gitea_mod.settings, "gitea_webhook_secret", "", raising=False)
|
||||
monkeypatch.setattr(plane_mod.settings, "plane_webhook_secret", "", raising=False)
|
||||
yield
|
||||
|
||||
|
||||
client = TestClient(app)
|
||||
|
||||
|
||||
def _events_count():
|
||||
conn = get_db()
|
||||
n = conn.execute("SELECT COUNT(*) FROM events").fetchone()[0]
|
||||
conn.close()
|
||||
return n
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Migration
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def test_migration_adds_delivery_id_and_index():
|
||||
"""events has delivery_id + a partial unique index idx_events_delivery."""
|
||||
conn = get_db()
|
||||
cols = [r[1] for r in conn.execute("PRAGMA table_info(events)").fetchall()]
|
||||
idxs = [r[1] for r in conn.execute("PRAGMA index_list(events)").fetchall()]
|
||||
conn.close()
|
||||
assert "delivery_id" in cols
|
||||
assert "idx_events_delivery" in idxs
|
||||
|
||||
|
||||
def test_migration_on_old_db_without_column_does_not_crash():
|
||||
"""init_db() over a pre-existing events table WITHOUT delivery_id is safe."""
|
||||
if os.path.exists(_test_db):
|
||||
os.unlink(_test_db)
|
||||
import sqlite3
|
||||
conn = sqlite3.connect(_test_db)
|
||||
# Old-shape events table (no delivery_id) + a legacy row with NULL delivery_id.
|
||||
conn.executescript(
|
||||
"""
|
||||
CREATE TABLE events (
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
timestamp TEXT DEFAULT (datetime('now')),
|
||||
source TEXT NOT NULL,
|
||||
event_type TEXT NOT NULL,
|
||||
payload TEXT NOT NULL,
|
||||
processed INTEGER DEFAULT 0
|
||||
);
|
||||
INSERT INTO events (source, event_type, payload) VALUES ('plane','old','{}');
|
||||
INSERT INTO events (source, event_type, payload) VALUES ('gitea','old2','{}');
|
||||
"""
|
||||
)
|
||||
conn.commit()
|
||||
conn.close()
|
||||
|
||||
# Should add the column + index without raising and keep the legacy rows.
|
||||
init_db()
|
||||
|
||||
conn = get_db()
|
||||
cols = [r[1] for r in conn.execute("PRAGMA table_info(events)").fetchall()]
|
||||
n = conn.execute("SELECT COUNT(*) FROM events").fetchone()[0]
|
||||
conn.close()
|
||||
assert "delivery_id" in cols
|
||||
assert n == 2 # legacy NULL-delivery rows preserved, partial index lets them coexist
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Gitea dedup
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
@patch.object(gitea_mod, "enqueue_job")
|
||||
def test_gitea_duplicate_delivery_id_skips_dispatch(mock_enqueue):
|
||||
"""Repeated X-Gitea-Delivery -> first processed, second {"status":"duplicate"}."""
|
||||
# Task at architecture so the ADR push would enqueue.
|
||||
conn = get_db()
|
||||
conn.execute(
|
||||
"INSERT INTO tasks (plane_id, work_item_id, repo, branch, stage) "
|
||||
"VALUES (?, ?, ?, ?, ?)",
|
||||
("gd-001", "ET-100", "enduro-trails", "feature/ET-100-x", "architecture"),
|
||||
)
|
||||
conn.commit()
|
||||
conn.close()
|
||||
|
||||
body = {
|
||||
"ref": "refs/heads/feature/ET-100-x",
|
||||
"repository": {"name": "enduro-trails"},
|
||||
"commits": [
|
||||
{"added": ["docs/work-items/ET-100/06-adr/001-d.md"], "modified": []}
|
||||
],
|
||||
}
|
||||
hdrs = {"X-Gitea-Event": "push", "X-Gitea-Delivery": "guid-AAA"}
|
||||
|
||||
r1 = client.post("/webhook/gitea", json=body, headers=hdrs)
|
||||
assert r1.status_code == 200
|
||||
assert r1.json()["status"] == "accepted"
|
||||
assert mock_enqueue.call_count == 1
|
||||
assert _events_count() == 1
|
||||
|
||||
# Same delivery id again -> duplicate, no new enqueue, no new event row.
|
||||
r2 = client.post("/webhook/gitea", json=body, headers=hdrs)
|
||||
assert r2.status_code == 200
|
||||
assert r2.json()["status"] == "duplicate"
|
||||
assert mock_enqueue.call_count == 1
|
||||
assert _events_count() == 1
|
||||
|
||||
|
||||
@patch.object(gitea_mod, "enqueue_job")
|
||||
def test_gitea_two_distinct_delivery_ids_both_processed(mock_enqueue):
|
||||
body = {"ref": "refs/heads/feature/none", "repository": {"name": "enduro-trails"}, "commits": []}
|
||||
r1 = client.post("/webhook/gitea", json=body,
|
||||
headers={"X-Gitea-Event": "push", "X-Gitea-Delivery": "guid-1"})
|
||||
r2 = client.post("/webhook/gitea", json=body,
|
||||
headers={"X-Gitea-Event": "push", "X-Gitea-Delivery": "guid-2"})
|
||||
assert r1.json()["status"] == "accepted"
|
||||
assert r2.json()["status"] == "accepted"
|
||||
assert _events_count() == 2
|
||||
|
||||
|
||||
def test_gitea_fallback_hash_when_no_delivery_header():
|
||||
"""No X-Gitea-Delivery -> sha256 fallback; identical body repeat = duplicate."""
|
||||
body = {"ref": "refs/heads/feature/none", "repository": {"name": "enduro-trails"}, "commits": []}
|
||||
r1 = client.post("/webhook/gitea", json=body, headers={"X-Gitea-Event": "push"})
|
||||
r2 = client.post("/webhook/gitea", json=body, headers={"X-Gitea-Event": "push"})
|
||||
assert r1.json()["status"] == "accepted"
|
||||
assert r2.json()["status"] == "duplicate"
|
||||
assert _events_count() == 1
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Plane dedup
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
@patch.object(plane_mod, "enqueue_job")
|
||||
@patch("src.webhooks.plane._create_gitea_branch", new_callable=AsyncMock)
|
||||
@patch("src.webhooks.plane._create_initial_docs", new_callable=AsyncMock)
|
||||
def test_plane_fallback_hash_dedup(mock_docs, mock_branch, mock_enqueue):
|
||||
"""Repeated identical Plane body -> first accepted+enqueue, repeat duplicate."""
|
||||
body = {
|
||||
"event": "work_item.created",
|
||||
"data": {
|
||||
"id": "pd-001",
|
||||
"name": "Dedup plane task",
|
||||
"description_stripped": "A sufficiently long description for QG-0 to pass.",
|
||||
"project": "proj-1",
|
||||
},
|
||||
}
|
||||
r1 = client.post("/webhook/plane", json=body)
|
||||
assert r1.status_code == 200
|
||||
assert r1.json()["status"] == "accepted"
|
||||
assert mock_enqueue.call_count == 1
|
||||
assert _events_count() == 1
|
||||
|
||||
r2 = client.post("/webhook/plane", json=body)
|
||||
assert r2.status_code == 200
|
||||
assert r2.json()["status"] == "duplicate"
|
||||
assert mock_enqueue.call_count == 1 # not re-enqueued
|
||||
assert _events_count() == 1
|
||||
|
||||
|
||||
@patch("src.webhooks.plane._create_gitea_branch", new_callable=AsyncMock)
|
||||
@patch("src.webhooks.plane._create_initial_docs", new_callable=AsyncMock)
|
||||
def test_plane_unknown_project_first_delivery_still_ignored(mock_docs, mock_branch):
|
||||
"""ORCH-6 intact: first delivery of an unknown project -> {"status":"ignored"}."""
|
||||
body = {
|
||||
"event": "work_item.created",
|
||||
"data": {"id": "unk-001", "name": "Unknown project task", "project": "proj-UNKNOWN"},
|
||||
}
|
||||
r1 = client.post("/webhook/plane", json=body)
|
||||
assert r1.status_code == 200
|
||||
assert r1.json()["status"] == "ignored"
|
||||
# Event WAS logged (dedup happens before the project filter), so a retry of the
|
||||
# SAME body is a duplicate, not re-evaluated.
|
||||
assert _events_count() == 1
|
||||
r2 = client.post("/webhook/plane", json=body)
|
||||
assert r2.json()["status"] == "duplicate"
|
||||
assert _events_count() == 1
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# HMAC still guarded (acceptance #4) — independent of the dedup path
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def test_gitea_invalid_signature_still_401(monkeypatch):
|
||||
monkeypatch.setattr(gitea_mod.settings, "gitea_webhook_secret", "s3cr3t", raising=False)
|
||||
r = client.post(
|
||||
"/webhook/gitea",
|
||||
json={"ref": "refs/heads/feature/x", "repository": {"name": "enduro-trails"}, "commits": []},
|
||||
headers={"X-Gitea-Event": "push", "X-Gitea-Signature": "deadbeef"},
|
||||
)
|
||||
assert r.status_code == 401
|
||||
|
||||
|
||||
def test_plane_invalid_signature_still_401(monkeypatch):
|
||||
monkeypatch.setattr(plane_mod.settings, "plane_webhook_secret", "s3cr3t", raising=False)
|
||||
r = client.post(
|
||||
"/webhook/plane",
|
||||
json={"event": "work_item.created", "data": {"id": "z", "project": "proj-1"}},
|
||||
headers={"X-Plane-Signature": "deadbeef"},
|
||||
)
|
||||
assert r.status_code == 401
|
||||
Reference in New Issue
Block a user