Compare commits

..

22 Commits

Author SHA1 Message Date
Dev Agent
d305521067 feat(plane): per-agent bot authorship for comments
add_comment now accepts an optional author (agent role) and POSTs under the matching Plane bot token via _headers_for(), so Plane shows the real author (Analyst/Architect/Developer/Reviewer/Tester/Deployer/Stream) instead of a single shared account. Unknown/empty roles or missing tokens fall back to the shared orchestrator token (autonomy preserved). GET/PATCH (find_issue_id, set_state) are unchanged and stay on the shared token. Call sites in stage_engine, launcher, webhooks/plane and the plane_sync notify helpers now pass author by stage role; stage transitions use stream. Adds tests/test_plane_author.py.
2026-06-03 10:53:25 +03:00
Dev Agent
30d6dd0557 feat(config): add per-agent Plane bot token settings
Add 7 optional bot-token fields (plane_bot_analyst..stream) read from the ORCH_PLANE_BOT_* env vars, default empty. Required for per-agent comment authorship; empty values fall back to the shared orchestrator token.
2026-06-03 10:53:17 +03:00
12e2691a24 Merge pull request 'M-6: derive work_item_id from Plane sequence_id' (#8) from feature/ORCH-M6-plane-sequence into main 2026-06-03 10:04:32 +03:00
Dev Agent
c431a3d055 fix(plane_sync): drop hardcoded ET- prefix in find_issue_id (M-6) 2026-06-03 10:02:15 +03:00
Dev Agent
1d978caea7 feat(webhook): derive work_item_id from Plane sequence_id (M-6) 2026-06-03 10:02:15 +03:00
be27f506e3 Merge pull request 'ORCH cleanup L-1/L-2/L-3: stages comment, prune run logs, emoji constants' (#7) from feature/ORCH-cleanup-L1L2L3 into main 2026-06-03 09:55:53 +03:00
Dev Agent
8f11971bfc refactor(plane_sync): extract emoji literals to constants (L-3) 2026-06-03 09:54:43 +03:00
Dev Agent
0653c2437f feat(launcher): prune old run logs (L-2) 2026-06-03 09:53:55 +03:00
Dev Agent
48b7707eb3 docs(stages): fix misleading STAGE_TRANSITIONS comment (L-1) 2026-06-03 09:51:46 +03:00
2fdc6856ba Merge pull request 'ORCH-5: webhook delivery dedup (M-7)' (#6) from feature/ORCH-5-webhook-dedup into main 2026-06-03 09:20:38 +03:00
Dev Agent
4ac449ff63 test(webhook): cover delivery dedup + migration safety (M-7) 2026-06-03 09:18:02 +03:00
Dev Agent
e6a7c6de8d feat(webhook): dedup deliveries by delivery_id (M-7) 2026-06-03 09:18:02 +03:00
Dev Agent
0b924208dc feat(db): add events.delivery_id + partial unique index (M-7) 2026-06-03 09:18:02 +03:00
2f0fd24670 Merge pull request 'ORCH-4: unified stage-engine (M-3)' (#5) from feature/ORCH-4-stage-engine into main 2026-06-03 08:59:51 +03:00
Dev Agent
6abdc220d2 test(stage): cover unified stage_engine + launcher/plane delegation
18 tests: happy-path advance per stage with correct agent (ORCH-4 fix),
QG-fail no-advance, reviewer REQUEST_CHANGES rollback+retry/alert, tester FAIL
rollback+retry/block, architect conflict rollback to analysis, analyst
approved-flow no-advance, and launcher+plane both delegating to the engine.
2026-06-03 08:56:25 +03:00
Dev Agent
51401a3ba9 refactor(launcher,plane): delegate stage advance to stage_engine
launcher._try_advance_stage and plane._try_advance_stage are now thin
wrappers over stage_engine.advance_stage. The plane webhook calls the sync
engine via asyncio.to_thread so there is exactly one implementation. The
launcher forwards finished_agent so the agent-specific rollback branches still
fire; the webhook passes None (human :approved:), matching prior behavior.

Also fixes the agent-selection bug in the launcher path: it used to enqueue
get_agent_for_stage(next_stage) (skipping a stage, e.g. analysis->architecture
launched developer instead of architect). The unified engine uses
get_agent_for_stage(current_stage), consistent with plane and gitea.
2026-06-03 08:56:25 +03:00
Dev Agent
0befc49b1e refactor(stage): extract unified stage_engine.advance_stage (M-3)
Merge the two diverged _try_advance_stage implementations (launcher sync +
plane async) into one synchronous engine. Preserves all launcher business
logic (analyst approved-flow, reviewer REQUEST_CHANGES rollback+retry, tester
FAIL rollback+retry, architect conflict rollback) and the plane
check_review_approved PR-by-branch dispatch. Unifies the QG signature
dispatch. Fixes agent selection: advancing FROM current_stage launches
get_agent_for_stage(current_stage), not next_stage.
2026-06-03 08:56:14 +03:00
fd554c8a5a Merge pull request 'ORCH-7: cleanup + hardening (M-4 dead code + M-2 graceful timeout)' (#4) from feature/ORCH-7-hardening into main 2026-06-03 08:31:26 +03:00
Dev Agent
c167c6930d test(launcher): watchdog graceful kill ordering + timeout config + M-4 removal
Cover M-2: SIGTERM-before-SIGKILL ordering, graceful exit within grace skips
SIGKILL, ProcessLookupError before SIGTERM is tolerated (no _record_kill), and
_resolve_timeout per-agent override / default / malformed-JSON fallback.
Cover M-4: _auto_merge_pr removed, _ensure_pr retained.
2026-06-03 08:28:09 +03:00
Dev Agent
49ecb48eb0 feat(launcher): graceful SIGTERM->SIGKILL + configurable agent timeout (M-2)
The watchdog used to time.sleep(timeout) then immediately SIGKILL, which cut
claude off mid-write and left half-written artifacts. It now sends SIGTERM,
polls os.kill(pid, 0) for up to agent_kill_grace_seconds, and only SIGKILL if
the process is still alive; ProcessLookupError is tolerated at every step.

Timeout is now configurable via config.py: agent_timeout_seconds (default 1800),
agent_kill_grace_seconds (default 20), and agent_timeout_overrides_json for
per-agent overrides (e.g. {"reviewer": 3600}). AGENT_TIMEOUT is kept as a
backward-compatible alias. The recorded exit_code stays -9 so the ORCH-1
monitor retry/fail logic is unchanged (timeout-kills classify as permanent and
requeue within max_attempts, no retry loop).
2026-06-03 08:28:03 +03:00
Dev Agent
237732bc64 refactor(launcher): remove dead _auto_merge_pr (M-4)
_auto_merge_pr had zero callers (merge is handled by the deployer agent).
Removed the method; _ensure_pr (still used by the auto-advance path) is kept.
2026-06-03 08:27:52 +03:00
4e52e192e4 Merge pull request 'ORCH-1 (F-2b): persistent job queue instead of in-process daemon threads' (#3) from feature/ORCH-1-job-queue into main 2026-06-03 08:09:23 +03:00
16 changed files with 2106 additions and 349 deletions

View File

@@ -1,8 +1,10 @@
import subprocess
import os
import json
import logging
import threading
import signal
import time
from ..config import settings
from ..db import get_db, get_task_by_repo_branch, update_task_stage, enqueue_job
from ..stages import get_next_stage, get_qg_for_stage, get_agent_for_stage
@@ -14,6 +16,62 @@ from ..plane_sync import notify_stage_change as plane_notify_stage, add_comment
logger = logging.getLogger("orchestrator.launcher")
def prune_run_logs(runs_dir, keep_days=30, keep_max=500, active_paths=None):
"""L-2: best-effort rotation of per-run logs (<runs_dir>/*.log).
A log file is removed if it is older than keep_days OR it is not within the
keep_max most-recent logs (whichever condition is met first). Only *.log
files directly inside runs_dir are considered; non-.log files and
subdirectories are never touched. Files whose path is in active_paths (the
currently running log) are always kept.
Returns the number of files removed. Never raises: any error is logged and
swallowed so log rotation can never bring the app down.
"""
removed = 0
try:
active = set()
for ap in (active_paths or []):
try:
active.add(os.path.realpath(ap))
except Exception:
active.add(ap)
if not os.path.isdir(runs_dir):
return 0
logs = []
for name in os.listdir(runs_dir):
if not name.endswith(".log"):
continue
path = os.path.join(runs_dir, name)
if not os.path.isfile(path):
continue
if os.path.realpath(path) in active:
continue
try:
mtime = os.path.getmtime(path)
except OSError:
continue
logs.append((path, mtime))
logs.sort(key=lambda t: t[1], reverse=True)
cutoff = time.time() - keep_days * 86400
for idx, (path, mtime) in enumerate(logs):
too_old = mtime < cutoff
over_max = idx >= keep_max
if too_old or over_max:
try:
os.remove(path)
removed += 1
except OSError as e:
logger.warning(f"prune_run_logs: failed to remove {path}: {e}")
except Exception as e:
logger.warning(f"prune_run_logs failed for {runs_dir}: {e}")
return removed
class AgentLauncher:
"""Launch Claude CLI agents directly (binary mounted into container)."""
@@ -53,7 +111,10 @@ class AgentLauncher:
}
CLAUDE_BIN = "/opt/claude-code/bin/claude.exe"
AGENT_TIMEOUT = 1800 # 30 minutes
# ORCH-7 (M-2): timeout is now configurable. AGENT_TIMEOUT stays as a
# backward-compatible alias for the default; the actual value (and per-agent
# overrides) live in settings and are resolved via _resolve_timeout().
AGENT_TIMEOUT = settings.agent_timeout_seconds
def launch(self, agent: str, repo: str, task_content: str = None, task_id: int = None) -> int:
"""
@@ -190,7 +251,7 @@ class AgentLauncher:
t = threading.Thread(
target=self._watchdog,
args=(proc.pid, run_id),
kwargs={"job_id": job_id},
kwargs={"job_id": job_id, "agent": agent},
daemon=True,
)
t.start()
@@ -209,29 +270,100 @@ class AgentLauncher:
notify_agent_started(run_id, agent, task_id)
return run_id
def _watchdog(self, pid: int, run_id: int, timeout: int = None, job_id: int = None):
"""Kill agent if it exceeds timeout.
@staticmethod
def _resolve_timeout(agent: str = None) -> int:
"""ORCH-7 (M-2): resolve the wall-clock timeout for an agent.
Per-agent override from settings.agent_timeout_overrides_json (a JSON object
like {"reviewer": 3600}) wins; otherwise the global default
settings.agent_timeout_seconds is used. A malformed override JSON is ignored
(falls back to the default) and only logged, so a bad env never bricks runs.
"""
default = settings.agent_timeout_seconds
raw = (settings.agent_timeout_overrides_json or "").strip()
if agent and raw:
try:
overrides = json.loads(raw)
if isinstance(overrides, dict) and agent in overrides:
return int(overrides[agent])
except (ValueError, TypeError) as e:
logger.warning(f"Invalid agent_timeout_overrides_json, using default: {e}")
return default
def _watchdog(self, pid: int, run_id: int, timeout: int = None,
job_id: int = None, agent: str = None):
"""Kill agent if it exceeds its timeout.
ORCH-1: on a timeout-kill the monitor's proc.wait() returns the kill exit
code and drives the job retry/fail logic, so the watchdog itself only needs
to SIGKILL and record the agent_runs exit. job_id is accepted for symmetry.
to terminate the process and record the agent_runs exit. job_id is accepted
for symmetry.
ORCH-7 (M-2): graceful shutdown. Instead of an immediate SIGKILL (which cuts
claude off mid-write and leaves half-written artifacts), send SIGTERM first,
give the process up to settings.agent_kill_grace_seconds to flush and exit on
its own, and only SIGKILL if it is still alive after the grace window. If the
process exits during the grace window, SIGKILL is NOT sent.
ProcessLookupError is tolerated at every step (the process may already be
gone). The recorded exit_code stays -9 to match the existing retry/fail
contract regardless of which signal actually reaped it.
"""
import time
if timeout is None:
timeout = self.AGENT_TIMEOUT
timeout = self._resolve_timeout(agent)
time.sleep(timeout)
# Phase 1: SIGTERM (graceful). If the process is already gone, we're done.
try:
os.kill(pid, signal.SIGTERM)
logger.warning(
f"Agent run_id={run_id} exceeded {timeout}s timeout: sent SIGTERM "
f"(pid={pid}), grace={settings.agent_kill_grace_seconds}s"
)
except ProcessLookupError:
logger.info(f"Agent run_id={run_id} already exited before SIGTERM")
return # nothing to record: the monitor's proc.wait() owns the exit
# Phase 2: poll for graceful exit within the grace window.
grace = settings.agent_kill_grace_seconds
poll_interval = 0.5
waited = 0.0
while waited < grace:
time.sleep(poll_interval)
waited += poll_interval
try:
os.kill(pid, 0) # signal 0 = liveness probe, does not kill
except ProcessLookupError:
logger.info(
f"Agent run_id={run_id} exited gracefully after SIGTERM "
f"({waited:.1f}s); no SIGKILL needed"
)
self._record_kill(run_id)
return
# Phase 3: still alive -> hard SIGKILL.
try:
os.kill(pid, signal.SIGKILL)
logger.warning(f"Agent run_id={run_id} killed after {timeout}s timeout")
conn = get_db()
conn.execute(
"UPDATE agent_runs SET finished_at=datetime('now'), exit_code=-9 WHERE id=?",
(run_id,),
logger.warning(
f"Agent run_id={run_id} did not exit within {grace}s grace: sent SIGKILL"
)
conn.commit()
conn.close()
except ProcessLookupError:
pass # Already finished
logger.info(f"Agent run_id={run_id} exited just before SIGKILL")
self._record_kill(run_id)
@staticmethod
def _record_kill(run_id: int):
"""Stamp the agent_runs row as timeout-killed (exit_code=-9).
ORCH-1: -9 is the existing kill-exit contract the monitor/retry logic keys
off, so we keep it stable whether the reap came from SIGTERM or SIGKILL.
"""
conn = get_db()
conn.execute(
"UPDATE agent_runs SET finished_at=datetime('now'), exit_code=-9 WHERE id=?",
(run_id,),
)
conn.commit()
conn.close()
def _monitor_agent(self, proc, run_id, agent, repo, branch, output_path=None, log_fh=None, job_id=None):
"""Wait for agent to finish, commit+push results, update DB.
@@ -339,7 +471,8 @@ class AgentLauncher:
set_issue_blocked(_wid)
plane_add_comment(
_wid,
"\u274c Deploy FAILED (smoke/healthcheck). Rolled back. Developer \u043d\u0443\u0436\u0435\u043d \u0434\u043b\u044f \u0444\u0438\u043a\u0441\u0430."
"\u274c Deploy FAILED (smoke/healthcheck). Rolled back. Developer \u043d\u0443\u0436\u0435\u043d \u0434\u043b\u044f \u0444\u0438\u043a\u0441\u0430.",
author="deployer",
)
from ..notifications import send_telegram
send_telegram(f"\U0001f6a8 {_wid}: Deploy failed! Rolled back. Needs fix.")
@@ -488,7 +621,15 @@ class AgentLauncher:
pass
def _try_advance_stage(self, run_id: int, agent: str, repo: str, branch: str):
"""After agent finishes successfully, check QG and advance stage if possible."""
"""After agent finishes successfully, advance the stage via the unified engine.
ORCH-4 / M-3: the 174-line body that used to live here moved into
src/stage_engine.advance_stage(). This is now a thin wrapper: it looks up
the task by (repo, branch) and delegates. `agent` is forwarded as
finished_agent so the analyst/reviewer/tester/architect rollback branches
still trigger exactly as before. The agent-selection bug (it used to call
get_agent_for_stage(next_stage)) is fixed inside the engine.
"""
try:
conn = get_db()
task_row = conn.execute(
@@ -500,174 +641,15 @@ class AgentLauncher:
return
task_id, current_stage, work_item_id = task_row
qg_name = get_qg_for_stage(current_stage)
next_stage = get_next_stage(current_stage)
if not next_stage:
return
# Run QG check if defined
if qg_name and qg_name in QG_CHECKS:
check_fn = QG_CHECKS[qg_name]
if qg_name in ("check_analysis_approved",):
# Requires human approval - post request comment if analyst just finished
if agent == "analyst" and qg_name == "check_analysis_approved" and work_item_id:
files_check = QG_CHECKS.get("check_analysis_complete")
if files_check:
files_ok, _ = files_check(repo, work_item_id, branch)
if files_ok:
# Full artifacts ready -> In Review
from ..plane_sync import set_issue_in_review
set_issue_in_review(work_item_id)
plane_add_comment(
work_item_id,
"\U0001f4cb BRD/\u0422\u0417/AC/TestPlan \u0433\u043e\u0442\u043e\u0432\u044b. "
"\u041f\u0440\u043e\u0448\u0443 review \u0438 \u0440\u0435\u0430\u043a\u0446\u0438\u044e :approved: \u0434\u043b\u044f \u043f\u0440\u043e\u0434\u0432\u0438\u0436\u0435\u043d\u0438\u044f \u0432 Architecture."
)
notify_approve_requested(task_id)
logger.info(f"Task {task_id}: analyst finished, requested :approved: in Plane")
else:
# Check if questions file exists (in the task worktree)
import os as _os
questions_path = _os.path.join(
get_worktree_path(repo, branch),
f"docs/work-items/{work_item_id}/01-questions.md"
)
if _os.path.isfile(questions_path):
# Analyst has questions -> Needs Input
from ..plane_sync import set_issue_needs_input
set_issue_needs_input(work_item_id)
with open(questions_path, "r") as qf:
questions_text = qf.read()
plane_add_comment(
work_item_id,
f"\u2753 Analyst \u043d\u0443\u0436\u0434\u0430\u0435\u0442\u0441\u044f \u0432 \u0443\u0442\u043e\u0447\u043d\u0435\u043d\u0438\u0438:\n\n{questions_text}"
)
from ..notifications import send_telegram
send_telegram(
f"\u2753 {work_item_id}: Analyst \u0437\u0430\u0434\u0430\u0451\u0442 \u0432\u043e\u043f\u0440\u043e\u0441\u044b. \u041e\u0442\u0432\u0435\u0442\u044c \u0432 Plane."
)
else:
# No artifacts and no questions
plane_add_comment(
work_item_id,
"\u26a0\ufe0f Analyst \u0437\u0430\u0432\u0435\u0440\u0448\u0438\u043b\u0441\u044f \u0431\u0435\u0437 \u0430\u0440\u0442\u0435\u0444\u0430\u043a\u0442\u043e\u0432 \u0438 \u0431\u0435\u0437 \u0432\u043e\u043f\u0440\u043e\u0441\u043e\u0432. \u041f\u0440\u043e\u0432\u0435\u0440\u044c\u0442\u0435 \u043b\u043e\u0433."
)
return
elif qg_name in ("check_ci_green", "check_tests_local"):
# (repo, branch) signature — already worktree-aware.
passed, reason = check_fn(repo, branch)
elif qg_name == "check_tests_passed":
# Artifact check — pass branch so it reads from the worktree.
passed, reason = check_fn(repo, work_item_id or "", branch)
else:
# Other artifact checks (check_architecture_done, etc.) — worktree-aware.
passed, reason = check_fn(repo, work_item_id or "", branch)
if not passed:
logger.info(f"Task {task_id}: QG '{qg_name}' not passed after {agent}: {reason}")
# If reviewer says REQUEST_CHANGES, rollback to development
if agent == "reviewer" and "REQUEST_CHANGES" in reason:
update_task_stage(task_id, "development")
notify_stage_change(task_id, current_stage, "development")
plane_notify_stage(work_item_id, current_stage, "development")
# Count retries
conn2 = get_db()
retry_count = conn2.execute(
"SELECT COUNT(*) FROM agent_runs WHERE task_id=? AND agent='developer'",
(task_id,)
).fetchone()[0]
conn2.close()
if retry_count < 3:
task_desc = (
f"Work item: {work_item_id}\nRepo: {repo}\nBranch: {branch}\n"
f"Stage: development\nNote: REQUEST_CHANGES from reviewer "
f"(attempt {retry_count+1}/3). Fix findings in "
f"docs/work-items/{work_item_id}/12-review.md"
)
new_job = enqueue_job("developer", repo, task_desc, task_id=task_id)
logger.info(f"Task {task_id}: reviewer REQUEST_CHANGES, enqueued developer (job_id={new_job})")
else:
from ..notifications import send_telegram
send_telegram(f"\u26a0\ufe0f {work_item_id}: Max developer retries (3) reached. Manual intervention needed.")
logger.error(f"Task {task_id}: max retries reached")
# Task 6: Tester FAIL -> rollback to development
if agent == "tester" and qg_name == "check_tests_passed" and not passed:
update_task_stage(task_id, "development")
notify_stage_change(task_id, current_stage, "development")
plane_notify_stage(work_item_id, current_stage, "development")
from ..plane_sync import set_issue_in_progress
set_issue_in_progress(work_item_id)
plane_add_comment(
work_item_id,
f"\u274c \u0422\u0435\u0441\u0442\u044b \u043d\u0435 \u043f\u0440\u043e\u0448\u043b\u0438: {reason}. Developer \u043f\u0435\u0440\u0435\u0437\u0430\u043f\u0443\u0449\u0435\u043d \u0434\u043b\u044f \u0444\u0438\u043a\u0441\u0430."
)
conn2 = get_db()
retry_count = conn2.execute(
"SELECT COUNT(*) FROM agent_runs WHERE task_id=? AND agent='developer'",
(task_id,)
).fetchone()[0]
conn2.close()
if retry_count < 3:
task_desc = (
f"Work item: {work_item_id}\nRepo: {repo}\nBranch: {branch}\n"
f"Stage: development\nNote: Tests FAILED. "
f"Fix failures described in docs/work-items/{work_item_id}/13-test-report.md"
)
new_job = enqueue_job("developer", repo, task_desc, task_id=task_id)
logger.info(f"Task {task_id}: tester FAIL, enqueued developer (job_id={new_job})")
else:
from ..notifications import send_telegram
from ..plane_sync import set_issue_blocked
set_issue_blocked(work_item_id)
send_telegram(f"\U0001f6a8 {work_item_id}: Tests still failing after 3 developer retries. Manual intervention needed.")
# Task 8: Architect conflict -> rollback to analysis
if agent == "architect" and qg_name == "check_architecture_done" and not passed:
import os as _os
conflict_path = _os.path.join(
get_worktree_path(repo, branch),
f"docs/work-items/{work_item_id}/10-conflict.md"
)
if _os.path.isfile(conflict_path):
update_task_stage(task_id, "analysis")
notify_stage_change(task_id, current_stage, "analysis")
plane_notify_stage(work_item_id, current_stage, "analysis")
from ..plane_sync import set_issue_in_progress
set_issue_in_progress(work_item_id)
with open(conflict_path, "r") as cf:
conflict_text = cf.read()[:500]
plane_add_comment(
work_item_id,
f"\u26a0\ufe0f Architect \u043d\u0430\u0448\u0451\u043b \u043a\u043e\u043d\u0444\u043b\u0438\u043a\u0442 \u0441 \u0422\u0417. \u0412\u043e\u0437\u0432\u0440\u0430\u0442 \u0432 Analysis.\n\n{conflict_text}"
)
task_desc = (
f"Work item: {work_item_id}\nRepo: {repo}\nBranch: {branch}\n"
f"Stage: analysis\nNote: Architect conflict. Revise TRZ. "
f"See docs/work-items/{work_item_id}/10-conflict.md"
)
new_job = enqueue_job("analyst", repo, task_desc, task_id=task_id)
logger.info(f"Task {task_id}: architect conflict, enqueued analyst (job_id={new_job})")
return
return
elif qg_name:
return
# Advance stage
update_task_stage(task_id, next_stage)
notify_stage_change(task_id, current_stage, next_stage)
plane_notify_stage(work_item_id, current_stage, next_stage)
logger.info(f"Task {task_id}: {current_stage} -> {next_stage} (auto-advance after {agent})")
# Launch next agent if defined
next_agent = get_agent_for_stage(next_stage)
if next_agent:
task_desc = f"Work item: {work_item_id}\nRepo: {repo}\nBranch: {branch}\nStage: {next_stage}"
new_job_id = enqueue_job(next_agent, repo, task_desc, task_id=task_id)
logger.info(f"Task {task_id}: enqueued '{next_agent}' (job_id={new_job_id})")
from ..stage_engine import advance_stage
advance_stage(
task_id=task_id,
current_stage=current_stage,
repo=repo,
work_item_id=work_item_id,
branch=branch,
finished_agent=agent,
)
except Exception as e:
logger.error(f"Auto-advance failed for run_id={run_id}: {e}")
@@ -703,47 +685,6 @@ class AgentLauncher:
logger.error(f"Failed to create PR for {branch}: {e}")
return None
def _auto_merge_pr(self, repo: str, branch: str, task_id: int, work_item_id: str):
import httpx
owner = settings.gitea_owner
headers = {"Authorization": f"token {settings.gitea_token}"}
base_url = f"{settings.gitea_url}/api/v1"
try:
resp = httpx.get(
f"{base_url}/repos/{owner}/{repo}/pulls",
params={"state": "open", "head": branch},
headers=headers, timeout=10
)
resp.raise_for_status()
prs = resp.json()
if not prs:
pr_number = self._ensure_pr(repo, branch, 0)
if not pr_number:
return False
else:
pr_number = prs[0]["number"]
resp = httpx.post(
f"{base_url}/repos/{owner}/{repo}/pulls/{pr_number}/merge",
json={"Do": "merge"},
headers=headers, timeout=30
)
if resp.status_code in (200, 204):
logger.info(f"PR #{pr_number} merged for {branch}")
update_task_stage(task_id, "done")
notify_stage_change(task_id, "deploy", "done")
plane_notify_stage(work_item_id, "deploy", "done")
from ..notifications import send_telegram
send_telegram(f"\u2705 {work_item_id}: PR #{pr_number} merged! deploy -> done. Task complete.")
return True
else:
logger.error(f"Merge failed for PR #{pr_number}: {resp.status_code} {resp.text}")
from ..notifications import send_telegram
send_telegram(f"\u26a0\ufe0f {work_item_id}: Auto-merge failed (HTTP {resp.status_code}). Manual merge needed.")
return False
except Exception as e:
logger.error(f"Auto-merge failed for {branch}: {e}")
return False
def _write_task_file(self, repo: str, branch: str, task_file: str, content: str):
"""Write task file directly into the task's worktree.

View File

@@ -9,6 +9,17 @@ class Settings(BaseSettings):
plane_webhook_secret: str = ""
plane_project_id: str = ""
# Per-agent Plane bot tokens (feat: per-agent comment authorship).
# When set, add_comment posts under the matching bot so Plane shows the
# real author (Analyst/Architect/...). Empty -> fallback to plane_api_token.
plane_bot_analyst: str = ""
plane_bot_architect: str = ""
plane_bot_developer: str = ""
plane_bot_reviewer: str = ""
plane_bot_tester: str = ""
plane_bot_deployer: str = ""
plane_bot_stream: str = ""
# Gitea
gitea_url: str = "http://localhost:3000"
gitea_token: str = ""
@@ -53,6 +64,28 @@ class Settings(BaseSettings):
breaker_threshold: int = 3
breaker_pause_seconds: int = 300
# ORCH-7 (M-2): agent timeout + graceful kill.
# agent_timeout_seconds -> default per-agent wall-clock budget; the watchdog
# kills the run after this (env ORCH_AGENT_TIMEOUT_SECONDS).
# agent_kill_grace_seconds-> pause between SIGTERM and SIGKILL so claude can
# flush artifacts before the hard kill
# (env ORCH_AGENT_KILL_GRACE_SECONDS).
# agent_timeout_overrides_json -> optional per-agent override JSON object,
# e.g. {"reviewer": 3600, "architect": 2700}
# (env ORCH_AGENT_TIMEOUT_OVERRIDES_JSON).
agent_timeout_seconds: int = 1800
agent_kill_grace_seconds: int = 20
agent_timeout_overrides_json: str = ""
# L-2: run-log rotation. Old per-run logs in <data>/runs/*.log are pruned at
# app startup (best-effort). A *.log is removed if it is older than
# log_keep_days OR not within the log_keep_max most-recent logs (whichever
# hits first). Only *.log files are touched; the active run log is skipped.
# log_keep_days -> max age in days (env ORCH_LOG_KEEP_DAYS).
# log_keep_max -> max number of newest logs to retain (env ORCH_LOG_KEEP_MAX).
log_keep_days: int = 30
log_keep_max: int = 500
# Telegram notifications
telegram_bot_token: str = ""

View File

@@ -67,6 +67,17 @@ def init_db():
# (CREATE TABLE IF NOT EXISTS won't add columns to an already-created table).
_ensure_column(conn, "jobs", "transient_attempts", "INTEGER NOT NULL DEFAULT 0")
_ensure_column(conn, "jobs", "available_at", "TEXT")
# ORCH-5 (M-7): webhook delivery de-dup. Add events.delivery_id and a PARTIAL
# unique index. Partial (WHERE delivery_id IS NOT NULL) so pre-existing rows
# (which have NULL delivery_id) never collide with each other. Restart-safe:
# _ensure_column is a no-op once the column exists, and CREATE INDEX IF NOT
# EXISTS is a no-op once the index exists, so this is safe on the live prod DB.
_ensure_column(conn, "events", "delivery_id", "TEXT")
conn.execute(
"CREATE UNIQUE INDEX IF NOT EXISTS idx_events_delivery "
"ON events(delivery_id) WHERE delivery_id IS NOT NULL"
)
conn.commit()
conn.close()
@@ -141,6 +152,33 @@ def get_next_work_item_id(repo: str, prefix: str = "ET") -> str:
return f"{prefix}-{next_num:03d}"
# ---------------------------------------------------------------------------
# ORCH-5 (M-7): idempotent webhook event logging
# ---------------------------------------------------------------------------
def insert_event_dedup(
source: str, event_type: str, payload: str, delivery_id: str
) -> bool:
"""Idempotently log a webhook event keyed by delivery_id.
Returns True if a NEW row was inserted (caller should dispatch the event) and
False if this delivery_id was already present (a duplicate delivery -> caller
must skip dispatch/enqueue). Uses INSERT OR IGNORE against the partial UNIQUE
index idx_events_delivery; rowcount==1 means the row was actually inserted.
"""
conn = get_db()
try:
cur = conn.execute(
"INSERT OR IGNORE INTO events (source, event_type, payload, delivery_id) "
"VALUES (?, ?, ?, ?)",
(source, event_type, payload, delivery_id),
)
conn.commit()
return cur.rowcount == 1
finally:
conn.close()
# ---------------------------------------------------------------------------
# ORCH-1 (F-2b): job queue helpers
# ---------------------------------------------------------------------------

View File

@@ -60,6 +60,22 @@ async def lifespan(app: FastAPI):
if requeued:
log.warning(f"Queue-recovery: requeued {requeued} running job(s) after restart")
# L-2: rotate old per-run logs at startup (best-effort; never fatal).
try:
import os as _os
from .config import settings as _settings
from .agents.launcher import prune_run_logs
_runs_dir = _os.path.join(_os.path.dirname(_settings.db_path), "runs")
_removed = prune_run_logs(
_runs_dir,
keep_days=_settings.log_keep_days,
keep_max=_settings.log_keep_max,
)
if _removed:
log.info(f"Log rotation: pruned {_removed} old run log(s) from {_runs_dir}")
except Exception as e:
log.warning(f"Log rotation skipped: {e}")
# Start the background job-queue worker (ORCH-1).
from .queue_worker import worker
worker.start()

View File

@@ -6,9 +6,53 @@ from .config import settings
logger = logging.getLogger("orchestrator.plane_sync")
# L-3: emoji literals used in Plane comment bodies, named for readability.
# Message text stays byte-for-byte identical to the previous output.
EMOJI_STAGE = "\U0001F504" # stage transition
EMOJI_QG_FAIL = "\u26A0\uFE0F" # quality-gate failure
EMOJI_DONE = "\u2705" # task completed
PLANE_BASE = f"{settings.plane_api_url}/api/v1"
PLANE_HEADERS = {"X-API-Key": settings.plane_api_token}
WORKSPACE = settings.plane_workspace_slug
# feat(plane): per-agent comment authorship.
# Map an agent role -> its dedicated Plane bot token (read from config / env).
# When the token is present, add_comment() POSTs under that bot so Plane shows
# the real author. Empty/unknown role -> fallback to the shared orchestrator
# token (PLANE_HEADERS), so commenting stays autonomous.
PLANE_BOT_TOKENS = {
"analyst": settings.plane_bot_analyst,
"architect": settings.plane_bot_architect,
"developer": settings.plane_bot_developer,
"reviewer": settings.plane_bot_reviewer,
"tester": settings.plane_bot_tester,
"deployer": settings.plane_bot_deployer,
"stream": settings.plane_bot_stream,
}
# Map a pipeline stage -> the agent role that owns work in that stage. Used to
# pick an author for rollback/stage notifications targeting a specific stage.
STAGE_AUTHORS = {
"analysis": "analyst",
"architecture": "architect",
"development": "developer",
"review": "reviewer",
"testing": "tester",
"deploy": "deployer",
}
def _headers_for(author: str | None) -> dict:
"""Return X-API-Key headers for the given agent role.
Falls back to the shared orchestrator token (PLANE_HEADERS /
settings.plane_api_token) when the role is None, unknown, or its bot token
is not configured. This keeps comment posting autonomous: a comment is
always written, just attributed to the orchestrator if no bot is set.
"""
tok = PLANE_BOT_TOKENS.get(author or "") if author else None
return {"X-API-Key": tok} if tok else PLANE_HEADERS
PROJECT_ID = settings.plane_project_id or "7a79f0a9-5278-49cd-9007-9a338f238f9c"
@@ -65,6 +109,24 @@ STAGE_TO_STATE = {
}
def fetch_issue_sequence_id(issue_id: str, project_id: str) -> int | None:
"""M-6: GET the Plane issue by UUID and return its sequence_id (the
authoritative per-project number), or None if unavailable.
Returns None on network error, non-2xx, or a missing field - never raises,
so the webhook handler can fall back to DB increment and stay autonomous.
"""
url = f"{PLANE_BASE}/workspaces/{WORKSPACE}/projects/{project_id}/issues/{issue_id}/"
try:
resp = httpx.get(url, headers=PLANE_HEADERS, timeout=10)
resp.raise_for_status()
seq = resp.json().get("sequence_id")
return int(seq) if seq is not None else None
except Exception as e:
logger.warning(f"fetch_issue_sequence_id failed for {issue_id}: {e}")
return None
def find_issue_id(work_item_id: str, project_id: str = None) -> str | None:
"""Find Plane issue UUID by work_item_id (e.g. 'ET-002')."""
project_id = _resolve_project_id(work_item_id, project_id)
@@ -89,25 +151,26 @@ def find_issue_id(work_item_id: str, project_id: str = None) -> str | None:
resp.raise_for_status()
data = resp.json()
results = data.get("results", data if isinstance(data, list) else [])
# M-6: match by sequence_id directly (the authoritative per-project
# number), parsed from the work_item_id suffix - no hardcoded prefix.
try:
target_num = int(work_item_id.rsplit("-", 1)[1])
except (IndexError, ValueError):
target_num = None
for issue in results:
seq = issue.get("sequence_id")
identifier = f"ET-{seq:03d}" if seq else ""
if identifier == work_item_id or work_item_id in issue.get("name", ""):
if target_num is not None and issue.get("sequence_id") == target_num:
return issue["id"]
# Fallback: get all issues and match by sequence_id number
if work_item_id.startswith("ET-"):
try:
target_num = int(work_item_id.split("-")[1])
except (IndexError, ValueError):
target_num = None
if target_num:
resp2 = httpx.get(url, headers=PLANE_HEADERS, timeout=10)
resp2.raise_for_status()
data2 = resp2.json()
results2 = data2.get("results", data2 if isinstance(data2, list) else [])
for issue in results2:
if issue.get("sequence_id") == target_num:
return issue["id"]
if work_item_id in issue.get("name", ""):
return issue["id"]
# Fallback: get all issues and match by sequence_id number (any prefix)
if target_num is not None:
resp2 = httpx.get(url, headers=PLANE_HEADERS, timeout=10)
resp2.raise_for_status()
data2 = resp2.json()
results2 = data2.get("results", data2 if isinstance(data2, list) else [])
for issue in results2:
if issue.get("sequence_id") == target_num:
return issue["id"]
except Exception as e:
logger.error(f"Failed to find issue for {work_item_id}: {e}")
return None
@@ -134,8 +197,14 @@ def update_issue_state(work_item_id: str, stage: str, project_id: str = None):
logger.error(f"Failed to update Plane state for {work_item_id}: {e}")
def add_comment(work_item_id: str, text: str, project_id: str = None):
"""Add a comment to Plane issue."""
def add_comment(work_item_id: str, text: str, project_id: str = None, author: str = None):
"""Add a comment to a Plane issue.
feat(plane): when ``author`` (an agent role) maps to a configured bot
token, the comment is POSTed under that bot so Plane shows the real author.
Otherwise it falls back to the shared orchestrator token (see
``_headers_for``). GET/PATCH calls elsewhere keep using PLANE_HEADERS.
"""
project_id = _resolve_project_id(work_item_id, project_id)
issue_id = find_issue_id(work_item_id, project_id)
if not issue_id:
@@ -145,9 +214,9 @@ def add_comment(work_item_id: str, text: str, project_id: str = None):
url = f"{PLANE_BASE}/workspaces/{WORKSPACE}/projects/{project_id}/issues/{issue_id}/comments/"
html = f"<p>{text}</p>"
try:
resp = httpx.post(url, headers=PLANE_HEADERS, json={"comment_html": html}, timeout=10)
resp = httpx.post(url, headers=_headers_for(author), json={"comment_html": html}, timeout=10)
resp.raise_for_status()
logger.info(f"Plane: comment added to {work_item_id}")
logger.info(f"Plane: comment added to {work_item_id} (author={author or 'orchestrator'})")
except Exception as e:
logger.error(f"Failed to add comment to {work_item_id}: {e}")
@@ -194,7 +263,7 @@ def notify_stage_change(work_item_id: str, old_stage: str, new_stage: str, agent
project_id = _resolve_project_id(work_item_id, project_id)
update_issue_state(work_item_id, new_stage, project_id)
msg = f"🔄 Stage: {old_stage}{new_stage}"
msg = f"{EMOJI_STAGE} Stage: {old_stage}{new_stage}"
if agent:
msg += f" (launching {agent})"
@@ -227,16 +296,29 @@ def notify_stage_change(work_item_id: str, old_stage: str, new_stage: str, agent
except Exception:
pass
add_comment(work_item_id, msg, project_id)
# Stage transition is the orchestrator's own voice -> attribute to stream.
add_comment(work_item_id, msg, project_id, author="stream")
def notify_qg_failure(work_item_id: str, stage: str, check: str, reason: str, project_id: str = None):
"""Notify Plane about QG failure."""
add_comment(work_item_id, f"⚠️ QG failed at {stage}: {check}{reason}", project_id)
# QG failure belongs to the agent that owns the failing stage.
add_comment(
work_item_id,
f"{EMOJI_QG_FAIL} QG failed at {stage}: {check}{reason}",
project_id,
author=STAGE_AUTHORS.get(stage, "stream"),
)
def notify_done(work_item_id: str, project_id: str = None):
"""Mark issue as Done in Plane."""
project_id = _resolve_project_id(work_item_id, project_id)
update_issue_state(work_item_id, "done", project_id)
add_comment(work_item_id, "✅ Task completed! PR merged and deployed.", project_id)
# Deploy finished the task -> attribute the completion comment to Deployer.
add_comment(
work_item_id,
f"{EMOJI_DONE} Task completed! PR merged and deployed.",
project_id,
author="deployer",
)

430
src/stage_engine.py Normal file
View File

@@ -0,0 +1,430 @@
"""Unified stage engine (ORCH-4 / M-3).
Single source of truth for "an agent finished / a human approved -> run the
stage's quality gate and either advance the pipeline or roll it back".
Before ORCH-4 this logic was duplicated in two places that had silently
diverged:
- src/agents/launcher.py::_try_advance_stage (sync, rich business logic:
analyst approved-flow, reviewer REQUEST_CHANGES rollback+retry, tester FAIL
rollback+retry, architect conflict rollback) — but it picked the next agent
with get_agent_for_stage(next_stage), which is WRONG.
- src/webhooks/plane.py::_try_advance_stage (async, leaner, but it had the
check_review_approved PR-by-branch dispatch and used the CORRECT
get_agent_for_stage(current_stage)).
This module merges both into one sync `advance_stage(...)`. launcher calls it
directly; the plane webhook calls it through asyncio.to_thread so there is
exactly one implementation.
Agent-selection bug fix (ORCH-4):
stages.py defines `agent` as "the agent to launch when advancing FROM this
stage". So when advancing current -> next, the correct agent to launch is
get_agent_for_stage(current_stage). launcher's old next_stage lookup skipped a
stage (e.g. analysis->architecture launched 'developer' instead of
'architect'). plane and gitea already used current_stage; we unify on that.
"""
import logging
import os
from dataclasses import dataclass, field
from .db import get_db, update_task_stage, enqueue_job
from .stages import get_next_stage, get_qg_for_stage, get_agent_for_stage
from .git_worktree import get_worktree_path
from .qg.checks import QG_CHECKS
from .notifications import (
notify_stage_change,
notify_qg_failure,
notify_approve_requested,
send_telegram,
)
from .plane_sync import (
notify_stage_change as plane_notify_stage,
notify_qg_failure as plane_notify_qg,
add_comment as plane_add_comment,
set_issue_in_review,
set_issue_needs_input,
set_issue_in_progress,
set_issue_blocked,
)
from .config import settings
logger = logging.getLogger("orchestrator.stage_engine")
MAX_DEVELOPER_RETRIES = 3
@dataclass
class AdvanceResult:
"""Outcome of an advance_stage() call (mostly for tests/observability)."""
advanced: bool = False
from_stage: str | None = None
to_stage: str | None = None
enqueued_agent: str | None = None
enqueued_job_id: int | None = None
qg_name: str | None = None
qg_passed: bool | None = None
qg_reason: str | None = None
rolled_back_to: str | None = None
alerted: bool = False
note: str | None = None
notes: list = field(default_factory=list)
def _run_qg(qg_name: str, repo: str, work_item_id: str, branch: str):
"""Dispatch a quality-gate check to the right signature and run it.
Signatures (unified from launcher + plane):
- check_ci_green / check_tests_local -> (repo, branch)
- check_review_approved -> (repo, pr_number) [PR found by branch]
- everything else (artifact checks) -> (repo, work_item_id, branch)
Returns (passed: bool, reason: str).
"""
check_fn = QG_CHECKS.get(qg_name)
if not check_fn:
logger.error(f"QG function '{qg_name}' not found in registry")
return False, f"Unknown QG: {qg_name}"
if qg_name in ("check_ci_green", "check_tests_local"):
# (repo, branch) — already worktree-aware.
return check_fn(repo, branch)
if qg_name == "check_review_approved":
# Special case kept from plane: find the open PR for this branch via
# Gitea, then check it; fall back to a file-based review marker.
return _check_review_approved_by_branch(check_fn, repo, work_item_id, branch)
# All other artifact checks: (repo, work_item_id, branch). Pass branch so the
# check reads from the task worktree (ORCH-2 / S-4).
return check_fn(repo, work_item_id or "", branch)
def _check_review_approved_by_branch(check_fn, repo: str, work_item_id: str, branch: str):
"""check_review_approved dispatch preserved from plane._try_advance_stage.
Finds the open PR whose head ref == branch via the Gitea API and runs
check_review_approved(repo, pr_number). If no open PR exists, falls back to a
file-based review marker (12-review.md / 09-review.md) like the original.
"""
import httpx as _httpx
owner = settings.gitea_owner
url = f"{settings.gitea_url}/api/v1/repos/{owner}/{repo}/pulls?state=open&limit=50"
headers = {"Authorization": f"token {settings.gitea_token}"}
try:
resp = _httpx.get(url, headers=headers, timeout=10)
prs = resp.json()
pr_number = None
for pr in prs:
if pr.get("head", {}).get("ref") == branch:
pr_number = pr["number"]
break
if pr_number:
return check_fn(repo, pr_number)
# No open PR but a review file may exist — check file-based.
wt = get_worktree_path(repo, branch)
if not os.path.isdir(wt):
wt = os.path.join(settings.repos_dir, repo)
review_path = os.path.join(wt, f"docs/work-items/{work_item_id}/12-review.md")
review_path2 = os.path.join(wt, f"docs/work-items/{work_item_id}/09-review.md")
if os.path.isfile(review_path) or os.path.isfile(review_path2):
return True, "Review file exists (file-based approval)"
return False, "No open PR found and no review file"
except Exception as e:
return False, f"Error finding PR: {e}"
def _developer_retry_count(task_id: int) -> int:
"""How many developer runs have already happened for this task."""
conn = get_db()
n = conn.execute(
"SELECT COUNT(*) FROM agent_runs WHERE task_id=? AND agent='developer'",
(task_id,),
).fetchone()[0]
conn.close()
return n
def advance_stage(
task_id: int,
current_stage: str,
repo: str,
work_item_id: str,
branch: str,
finished_agent: str | None = None,
) -> AdvanceResult:
"""Run the current stage's quality gate and advance / roll back the pipeline.
This is the single merged implementation (ORCH-4 / M-3). It is synchronous;
the async plane webhook calls it via asyncio.to_thread.
Args:
task_id: tasks.id
current_stage: the stage the task is currently in
repo: repository name
work_item_id: Plane work item id (may be "" / None)
branch: feature branch
finished_agent: the agent that just finished (launcher path). Drives the
approved/REQUEST_CHANGES/tester/architect branches. In the
plane webhook path it is None, so those agent-specific
branches simply do not trigger (matches old plane behavior).
Returns AdvanceResult describing what happened.
"""
result = AdvanceResult(from_stage=current_stage)
agent = finished_agent
try:
qg_name = get_qg_for_stage(current_stage)
next_stage = get_next_stage(current_stage)
result.qg_name = qg_name
result.to_stage = next_stage
if not next_stage:
logger.info(f"Task {task_id}: already at terminal stage '{current_stage}'")
result.note = "terminal"
return result
# --- Quality gate ----------------------------------------------------
if qg_name and qg_name in QG_CHECKS:
# Human-approval gate: special analyst approved-flow (launcher only).
if qg_name == "check_analysis_approved":
_handle_analysis_approved_flow(
task_id, current_stage, repo, work_item_id, branch, agent, result
)
return result
passed, reason = _run_qg(qg_name, repo, work_item_id, branch)
result.qg_passed = passed
result.qg_reason = reason
if not passed:
logger.info(
f"Task {task_id}: QG '{qg_name}' not passed after {agent}: {reason}"
)
# Behaviour parity:
# - webhook path (finished_agent is None): emit the generic
# QG-failure notification, exactly like the old plane handler.
# - launcher path (finished_agent set): NO generic notification;
# the rollback branches below own their own messaging, exactly
# like the old launcher handler.
if agent is None:
notify_qg_failure(task_id, current_stage, qg_name, reason)
plane_notify_qg(work_item_id, current_stage, qg_name, reason)
_handle_qg_failure_rollbacks(
task_id, current_stage, repo, work_item_id, branch,
agent, qg_name, reason, result,
)
return result
elif qg_name:
# QG name set but not registered — do not advance (launcher behavior).
result.note = f"qg '{qg_name}' not in registry"
return result
# --- Advance ---------------------------------------------------------
update_task_stage(task_id, next_stage)
notify_stage_change(task_id, current_stage, next_stage)
plane_notify_stage(work_item_id, current_stage, next_stage)
result.advanced = True
logger.info(
f"Task {task_id}: {current_stage} -> {next_stage} "
f"(auto-advance after {agent})"
)
# --- Launch the next agent (ORCH-4 fix: current_stage, not next) -----
next_agent = get_agent_for_stage(current_stage)
if next_agent:
task_desc = (
f"Work item: {work_item_id}\nRepo: {repo}\n"
f"Branch: {branch}\nStage: {next_stage}"
)
new_job_id = enqueue_job(next_agent, repo, task_desc, task_id=task_id)
result.enqueued_agent = next_agent
result.enqueued_job_id = new_job_id
logger.info(
f"Task {task_id}: enqueued '{next_agent}' (job_id={new_job_id})"
)
return result
except Exception as e:
logger.error(f"advance_stage failed for task_id={task_id}: {e}")
result.note = f"error: {e}"
return result
def _handle_analysis_approved_flow(
task_id, current_stage, repo, work_item_id, branch, agent, result: AdvanceResult
):
"""Analyst approved-flow (launcher only).
Only triggers when the analyst just finished (agent == 'analyst') in the
launcher path. Decides between: artifacts ready -> In Review + request
:approved:; questions file -> Needs Input; otherwise a warning comment.
This gate never advances on its own (human approval does that via the plane
webhook), matching the original launcher behavior.
"""
result.qg_name = "check_analysis_approved"
result.note = "analysis-approval-gate"
if not (agent == "analyst" and work_item_id):
return
files_check = QG_CHECKS.get("check_analysis_complete")
if not files_check:
return
files_ok, _ = files_check(repo, work_item_id, branch)
if files_ok:
# Full artifacts ready -> In Review, ask for :approved:.
set_issue_in_review(work_item_id)
plane_add_comment(
work_item_id,
"\U0001f4cb BRD/\u0422\u0417/AC/TestPlan \u0433\u043e\u0442\u043e\u0432\u044b. "
"\u041f\u0440\u043e\u0448\u0443 review \u0438 \u0440\u0435\u0430\u043a\u0446\u0438\u044e :approved: "
"\u0434\u043b\u044f \u043f\u0440\u043e\u0434\u0432\u0438\u0436\u0435\u043d\u0438\u044f \u0432 Architecture.",
author="analyst",
)
notify_approve_requested(task_id)
result.note = "analysis-in-review"
logger.info(
f"Task {task_id}: analyst finished, requested :approved: in Plane"
)
return
questions_path = os.path.join(
get_worktree_path(repo, branch),
f"docs/work-items/{work_item_id}/01-questions.md",
)
if os.path.isfile(questions_path):
set_issue_needs_input(work_item_id)
with open(questions_path, "r") as qf:
questions_text = qf.read()
plane_add_comment(
work_item_id,
f"\u2753 Analyst \u043d\u0443\u0436\u0434\u0430\u0435\u0442\u0441\u044f \u0432 \u0443\u0442\u043e\u0447\u043d\u0435\u043d\u0438\u0438:\n\n{questions_text}",
author="analyst",
)
send_telegram(
f"\u2753 {work_item_id}: Analyst \u0437\u0430\u0434\u0430\u0451\u0442 \u0432\u043e\u043f\u0440\u043e\u0441\u044b. \u041e\u0442\u0432\u0435\u0442\u044c \u0432 Plane."
)
result.note = "analysis-needs-input"
return
# No artifacts and no questions.
plane_add_comment(
work_item_id,
"\u26a0\ufe0f Analyst \u0437\u0430\u0432\u0435\u0440\u0448\u0438\u043b\u0441\u044f \u0431\u0435\u0437 \u0430\u0440\u0442\u0435\u0444\u0430\u043a\u0442\u043e\u0432 \u0438 \u0431\u0435\u0437 \u0432\u043e\u043f\u0440\u043e\u0441\u043e\u0432. \u041f\u0440\u043e\u0432\u0435\u0440\u044c\u0442\u0435 \u043b\u043e\u0433.",
author="analyst",
)
result.note = "analysis-empty"
def _handle_qg_failure_rollbacks(
task_id, current_stage, repo, work_item_id, branch,
agent, qg_name, reason, result: AdvanceResult,
):
"""All rollback/retry branches from the original launcher, preserved verbatim.
Only fire on the launcher path (finished_agent is set). The webhook path
passes finished_agent=None, so none of these agent-specific branches trigger
— that matches the old plane behavior (it just reported the QG failure).
"""
# Reviewer REQUEST_CHANGES -> rollback to development + retry (max 3).
if agent == "reviewer" and "REQUEST_CHANGES" in (reason or ""):
update_task_stage(task_id, "development")
notify_stage_change(task_id, current_stage, "development")
plane_notify_stage(work_item_id, current_stage, "development")
result.rolled_back_to = "development"
retry_count = _developer_retry_count(task_id)
if retry_count < MAX_DEVELOPER_RETRIES:
task_desc = (
f"Work item: {work_item_id}\nRepo: {repo}\nBranch: {branch}\n"
f"Stage: development\nNote: REQUEST_CHANGES from reviewer "
f"(attempt {retry_count+1}/3). Fix findings in "
f"docs/work-items/{work_item_id}/12-review.md"
)
new_job = enqueue_job("developer", repo, task_desc, task_id=task_id)
result.enqueued_agent = "developer"
result.enqueued_job_id = new_job
logger.info(
f"Task {task_id}: reviewer REQUEST_CHANGES, enqueued developer "
f"(job_id={new_job})"
)
else:
send_telegram(
f"\u26a0\ufe0f {work_item_id}: Max developer retries (3) reached. "
f"Manual intervention needed."
)
result.alerted = True
logger.error(f"Task {task_id}: max retries reached")
# Tester check_tests_passed FAIL -> rollback to development + retry (max 3).
if agent == "tester" and qg_name == "check_tests_passed":
update_task_stage(task_id, "development")
notify_stage_change(task_id, current_stage, "development")
plane_notify_stage(work_item_id, current_stage, "development")
result.rolled_back_to = "development"
set_issue_in_progress(work_item_id)
plane_add_comment(
work_item_id,
f"\u274c \u0422\u0435\u0441\u0442\u044b \u043d\u0435 \u043f\u0440\u043e\u0448\u043b\u0438: {reason}. "
f"Developer \u043f\u0435\u0440\u0435\u0437\u0430\u043f\u0443\u0449\u0435\u043d \u0434\u043b\u044f \u0444\u0438\u043a\u0441\u0430.",
author="tester",
)
retry_count = _developer_retry_count(task_id)
if retry_count < MAX_DEVELOPER_RETRIES:
task_desc = (
f"Work item: {work_item_id}\nRepo: {repo}\nBranch: {branch}\n"
f"Stage: development\nNote: Tests FAILED. "
f"Fix failures described in docs/work-items/{work_item_id}/13-test-report.md"
)
new_job = enqueue_job("developer", repo, task_desc, task_id=task_id)
result.enqueued_agent = "developer"
result.enqueued_job_id = new_job
logger.info(
f"Task {task_id}: tester FAIL, enqueued developer (job_id={new_job})"
)
else:
set_issue_blocked(work_item_id)
send_telegram(
f"\U0001f6a8 {work_item_id}: Tests still failing after 3 developer "
f"retries. Manual intervention needed."
)
result.alerted = True
# Architect conflict (10-conflict.md exists) -> rollback to analysis.
if agent == "architect" and qg_name == "check_architecture_done":
conflict_path = os.path.join(
get_worktree_path(repo, branch),
f"docs/work-items/{work_item_id}/10-conflict.md",
)
if os.path.isfile(conflict_path):
update_task_stage(task_id, "analysis")
notify_stage_change(task_id, current_stage, "analysis")
plane_notify_stage(work_item_id, current_stage, "analysis")
result.rolled_back_to = "analysis"
set_issue_in_progress(work_item_id)
with open(conflict_path, "r") as cf:
conflict_text = cf.read()[:500]
plane_add_comment(
work_item_id,
f"\u26a0\ufe0f Architect \u043d\u0430\u0448\u0451\u043b \u043a\u043e\u043d\u0444\u043b\u0438\u043a\u0442 \u0441 \u0422\u0417. "
f"\u0412\u043e\u0437\u0432\u0440\u0430\u0442 \u0432 Analysis.\n\n{conflict_text}",
author="architect",
)
task_desc = (
f"Work item: {work_item_id}\nRepo: {repo}\nBranch: {branch}\n"
f"Stage: analysis\nNote: Architect conflict. Revise TRZ. "
f"See docs/work-items/{work_item_id}/10-conflict.md"
)
new_job = enqueue_job("analyst", repo, task_desc, task_id=task_id)
result.enqueued_agent = "analyst"
result.enqueued_job_id = new_job
logger.info(
f"Task {task_id}: architect conflict, enqueued analyst "
f"(job_id={new_job})"
)

View File

@@ -5,7 +5,7 @@ Stages:
Each stage defines:
- next: the stage to advance to
- agent: the agent to launch when entering the NEXT stage
- agent: the agent to launch when advancing FROM this stage (NOT the next stage's agent)
- qg: the quality gate check required to leave this stage
"""

52
src/webhooks/_dedup.py Normal file
View File

@@ -0,0 +1,52 @@
"""ORCH-5 (M-7): webhook delivery de-duplication helper.
Webhook providers (Gitea/Plane) retry deliveries on timeout, network reset, or
manual replay. Without idempotency a retried delivery re-enters the pipeline and
spawns a duplicate run (the ET-009 incident class: parallel conveyors on one
repo). This module computes a stable per-delivery id so the webhook handlers can
INSERT-OR-IGNORE into events and skip the dispatch on a repeat.
delivery_id format: ``f"{source}:{raw_or_hash}"`` where source prefixes
gitea/plane so their id-spaces never collide. ``raw`` is the provider's native
delivery header (a GUID) when present; otherwise we fall back to a sha256 of the
body (a retried identical body yields the same hash).
"""
import hashlib
def _sha256_hex(*parts: str) -> str:
h = hashlib.sha256()
for p in parts:
h.update(p.encode("utf-8", "replace"))
return h.hexdigest()
def gitea_delivery_id(headers, event_type: str, body: bytes) -> str:
"""Compute the delivery_id for a Gitea webhook.
Prefers the ``X-Gitea-Delivery`` header (a per-delivery GUID). Falls back to
sha256(source + event_type + body) so a retried identical body still maps to
one id even if Gitea omitted the header.
"""
raw = (headers.get("X-Gitea-Delivery") or "").strip()
if not raw:
raw = _sha256_hex("gitea", event_type or "", body.decode("utf-8", "replace"))
return f"gitea:{raw}"
def plane_delivery_id(headers, body: bytes) -> str:
"""Compute the delivery_id for a Plane webhook.
Plane does not reliably send a delivery header, so we try a couple of common
names and otherwise fall back to sha256("plane" + body): a retried identical
body yields the same id.
"""
raw = (
headers.get("X-Plane-Delivery")
or headers.get("X-Hook-Delivery")
or ""
).strip()
if not raw:
raw = _sha256_hex("plane", body.decode("utf-8", "replace"))
return f"plane:{raw}"

View File

@@ -10,7 +10,14 @@ import httpx
from fastapi import APIRouter, Request, HTTPException
from ..config import settings
from ..db import get_db, get_task_by_repo_branch, update_task_stage, enqueue_job
from ..db import (
get_db,
get_task_by_repo_branch,
update_task_stage,
enqueue_job,
insert_event_dedup,
)
from ._dedup import gitea_delivery_id
from ..stages import get_next_stage, get_agent_for_stage
from ..qg.checks import check_ci_green, check_review_approved
from ..notifications import notify_stage_change, notify_qg_failure, notify_error
@@ -51,15 +58,17 @@ async def gitea_webhook(request: Request):
payload = json.loads(body)
# Log event
conn = get_db()
# ORCH-5 (M-7): idempotent logging. Compute a stable delivery_id (X-Gitea-Delivery
# GUID, or sha256 fallback) and INSERT OR IGNORE. A repeated delivery (Gitea retry
# / manual replay) returns inserted=False -> log + return {"status":"duplicate"}
# WITHOUT re-dispatching, so the pipeline is not re-triggered (ET-009 class).
# Runs AFTER HMAC verification above.
event_type = request.headers.get("X-Gitea-Event", "unknown")
conn.execute(
"INSERT INTO events (source, event_type, payload) VALUES (?, ?, ?)",
("gitea", event_type, body.decode()),
)
conn.commit()
conn.close()
delivery_id = gitea_delivery_id(request.headers, event_type, body)
inserted = insert_event_dedup("gitea", event_type, body.decode(), delivery_id)
if not inserted:
logger.info(f"Gitea webhook duplicate delivery_id={delivery_id}, skipping dispatch")
return {"status": "duplicate"}
if event_type == "push":
await handle_push(payload)

View File

@@ -15,7 +15,9 @@ from ..db import (
get_next_work_item_id,
update_task_stage,
enqueue_job,
insert_event_dedup,
)
from ._dedup import plane_delivery_id
from ..stages import get_next_stage, get_agent_for_stage, get_qg_for_stage, get_previous_stage
from ..qg.checks import QG_CHECKS
from ..notifications import notify_stage_change, notify_qg_failure, notify_error
@@ -61,14 +63,18 @@ async def plane_webhook(request: Request):
payload = json.loads(body)
# Log event
conn = get_db()
conn.execute(
"INSERT INTO events (source, event_type, payload) VALUES (?, ?, ?)",
("plane", payload.get("event", "unknown"), body.decode()),
)
conn.commit()
conn.close()
# ORCH-5 (M-7): idempotent logging. Plane rarely sends a delivery header, so the
# delivery_id falls back to sha256("plane" + body) (a retried identical body maps
# to one id). INSERT OR IGNORE; a duplicate returns inserted=False -> log + return
# {"status":"duplicate"} WITHOUT dispatching. Runs AFTER HMAC and BEFORE the ORCH-6
# project filter, so a repeat does no extra work; the FIRST delivery of an unknown
# project still falls through to the filter below and returns {"status":"ignored"}.
event_type = payload.get("event", "unknown")
delivery_id = plane_delivery_id(request.headers, body)
inserted = insert_event_dedup("plane", event_type, body.decode(), delivery_id)
if not inserted:
logger.info(f"Plane webhook duplicate delivery_id={delivery_id}, skipping dispatch")
return {"status": "duplicate"}
event = payload.get("event")
action = payload.get("action", "")
@@ -148,8 +154,20 @@ async def handle_work_item_created(data: dict, project_id: str = ""):
logger.info(f"QG-0 failed for {plane_id}: {errors}")
return
# Generate work item ID
work_item_id = get_next_work_item_id(repo, proj.work_item_prefix)
# Generate work item ID.
# M-6: source of truth for the number is the Plane sequence_id. Fetch it by
# issue UUID; if Plane is unavailable, fall back to the DB increment so a
# Plane outage never blocks task creation (autonomy > exact numbering).
from ..plane_sync import fetch_issue_sequence_id
seq = fetch_issue_sequence_id(plane_id, plane_project_id)
if seq is not None:
work_item_id = f"{proj.work_item_prefix}-{seq:03d}"
else:
work_item_id = get_next_work_item_id(repo, proj.work_item_prefix)
logger.warning(
f"Plane sequence_id unavailable for {plane_id}, "
f"fell back to DB increment: {work_item_id}"
)
# Create slug from name
slug = re.sub(r"[^a-z0-9]+", "-", name.lower()).strip("-")[:30]
@@ -191,7 +209,7 @@ async def handle_work_item_created(data: dict, project_id: str = ""):
logger.info(f"Task {task_id}: enqueued analyst (job_id={job_id})")
# Post start comment to Plane
from ..plane_sync import add_comment as _add_comment
_add_comment(work_item_id, "\U0001f50d Analyst \u0437\u0430\u043f\u0443\u0449\u0435\u043d. BRD/\u0422\u0417/AC/TestPlan \u0432 \u0440\u0430\u0431\u043e\u0442\u0435 (\u043e\u0436\u0438\u0434\u0430\u0439\u0442\u0435 8-15 \u043c\u0438\u043d).")
_add_comment(work_item_id, "\U0001f50d Analyst \u0437\u0430\u043f\u0443\u0449\u0435\u043d. BRD/\u0422\u0417/AC/TestPlan \u0432 \u0440\u0430\u0431\u043e\u0442\u0435 (\u043e\u0436\u0438\u0434\u0430\u0439\u0442\u0435 8-15 \u043c\u0438\u043d).", author="analyst")
except Exception as e:
logger.error(f"Failed to launch analyst for {work_item_id}: {e}")
@@ -234,7 +252,7 @@ async def handle_comment(data: dict, project_id: str = ""):
)
new_job = enqueue_job("analyst", repo, task_desc, task_id=task_id)
from ..plane_sync import add_comment as _plane_comment
_plane_comment(work_item_id, f"\U0001f504 Analyst \u043f\u0435\u0440\u0435\u0437\u0430\u043f\u0443\u0449\u0435\u043d. \u041f\u0440\u0438\u0447\u0438\u043d\u0430 \u043e\u0442\u043a\u043b\u043e\u043d\u0435\u043d\u0438\u044f: {reason}")
_plane_comment(work_item_id, f"\U0001f504 Analyst \u043f\u0435\u0440\u0435\u0437\u0430\u043f\u0443\u0449\u0435\u043d. \u041f\u0440\u0438\u0447\u0438\u043d\u0430 \u043e\u0442\u043a\u043b\u043e\u043d\u0435\u043d\u0438\u044f: {reason}", author="analyst")
logger.info(f"Task {task_id}: rejected at analysis, enqueued analyst (job_id={new_job})")
else:
# Rollback to previous stage
@@ -245,8 +263,12 @@ async def handle_comment(data: dict, project_id: str = ""):
set_issue_in_progress(work_item_id)
notify_stage_change(task_id, current_stage, prev_stage)
plane_notify_stage(work_item_id, current_stage, prev_stage)
from ..plane_sync import add_comment as _plane_comment
_plane_comment(work_item_id, f"\U0001f504 \u041e\u0442\u043a\u0430\u0442: {current_stage} \u2192 {prev_stage}. \u041f\u0440\u0438\u0447\u0438\u043d\u0430: {reason}")
from ..plane_sync import add_comment as _plane_comment, STAGE_AUTHORS
_plane_comment(
work_item_id,
f"\U0001f504 \u041e\u0442\u043a\u0430\u0442: {current_stage} \u2192 {prev_stage}. \u041f\u0440\u0438\u0447\u0438\u043d\u0430: {reason}",
author=STAGE_AUTHORS.get(prev_stage, "stream"),
)
logger.info(f"Task {task_id}: rejected, rolled back {current_stage} \u2192 {prev_stage}")
return
@@ -292,7 +314,8 @@ async def handle_comment(data: dict, project_id: str = ""):
_pc(
work_item_id,
"\U0001f6a8 3 \u0440\u0430\u0443\u043d\u0434\u0430 \u0443\u0442\u043e\u0447\u043d\u0435\u043d\u0438\u0439 \u0438\u0441\u0447\u0435\u0440\u043f\u0430\u043d\u044b. Analyst \u043d\u0435 \u043c\u043e\u0436\u0435\u0442 \u0441\u0444\u043e\u0440\u043c\u0438\u0440\u043e\u0432\u0430\u0442\u044c \u0422\u0417. "
"\u0422\u0440\u0435\u0431\u0443\u0435\u0442\u0441\u044f \u0431\u043e\u043b\u0435\u0435 \u0434\u0435\u0442\u0430\u043b\u044c\u043d\u043e\u0435 \u043e\u043f\u0438\u0441\u0430\u043d\u0438\u0435 \u0438\u043b\u0438 \u0432\u0441\u0442\u0440\u0435\u0447\u0430."
"\u0422\u0440\u0435\u0431\u0443\u0435\u0442\u0441\u044f \u0431\u043e\u043b\u0435\u0435 \u0434\u0435\u0442\u0430\u043b\u044c\u043d\u043e\u0435 \u043e\u043f\u0438\u0441\u0430\u043d\u0438\u0435 \u0438\u043b\u0438 \u0432\u0441\u0442\u0440\u0435\u0447\u0430.",
author="analyst",
)
from ..notifications import send_telegram
send_telegram(f"\U0001f6a8 {work_item_id}: 3 \u0440\u0430\u0443\u043d\u0434\u0430 \u0432\u043e\u043f\u0440\u043e\u0441\u043e\u0432 analyst'\u0430 \u0438\u0441\u0447\u0435\u0440\u043f\u0430\u043d\u044b. \u041d\u0443\u0436\u043d\u0430 \u043f\u043e\u043c\u043e\u0449\u044c.")
@@ -308,7 +331,7 @@ async def handle_comment(data: dict, project_id: str = ""):
)
new_job = enqueue_job("analyst", repo, task_desc, task_id=task_id)
from ..plane_sync import add_comment as _pc2
_pc2(work_item_id, "\U0001f504 Analyst \u043f\u0435\u0440\u0435\u0437\u0430\u043f\u0443\u0449\u0435\u043d \u0441 \u043e\u0442\u0432\u0435\u0442\u0430\u043c\u0438 \u0441\u0442\u0435\u0439\u043a\u0445\u043e\u043b\u0434\u0435\u0440\u0430.")
_pc2(work_item_id, "\U0001f504 Analyst \u043f\u0435\u0440\u0435\u0437\u0430\u043f\u0443\u0449\u0435\u043d \u0441 \u043e\u0442\u0432\u0435\u0442\u0430\u043c\u0438 \u0441\u0442\u0435\u0439\u043a\u0445\u043e\u043b\u0434\u0435\u0440\u0430.", author="analyst")
logger.info(f"Task {task_id}: stakeholder answered questions, enqueued analyst (job_id={new_job})")
return
except Exception as e:
@@ -318,81 +341,30 @@ async def handle_comment(data: dict, project_id: str = ""):
async def _try_advance_stage(
task_id: int, current_stage: str, repo: str, work_item_id: str, branch: str
):
"""Run QG check for current stage and advance if passed."""
qg_name = get_qg_for_stage(current_stage)
next_stage = get_next_stage(current_stage)
"""Thin async wrapper over the unified stage engine (ORCH-4 / M-3).
if not next_stage:
logger.info(f"Task {task_id}: already at terminal stage '{current_stage}'")
return
The QG dispatch (including the check_review_approved PR-by-branch logic) and
the advance/launch logic now live in src/stage_engine.advance_stage(), which
is synchronous. We run it off the event loop via asyncio.to_thread so there
is exactly one implementation shared with the launcher.
# Run QG check if one is required
if qg_name:
qg_func = QG_CHECKS.get(qg_name)
if not qg_func:
logger.error(f"QG function '{qg_name}' not found in registry")
return
finished_agent is None on this webhook path (a human :approved: comment, not
a finished agent), so the agent-specific rollback branches inside the engine
intentionally do not trigger — identical to the old plane behavior, which
only ran the QG and either advanced or reported the failure.
"""
import asyncio
from ..stage_engine import advance_stage
# Determine args based on QG function
if qg_name in ("check_analysis_approved", "check_analysis_complete", "check_architecture_done", "check_tests_passed", "check_reviewer_verdict"):
# ORCH-2 / S-4: pass branch so artifacts are read from the task worktree.
passed, reason = qg_func(repo, work_item_id, branch)
elif qg_name in ("check_ci_green", "check_tests_local"):
passed, reason = qg_func(repo, branch)
elif qg_name == "check_review_approved":
# Find PR number by branch via Gitea API
import httpx as _httpx
from ..config import settings as _s
_owner = _s.gitea_owner
_url = f"{_s.gitea_url}/api/v1/repos/{_owner}/{repo}/pulls?state=open&limit=50"
_headers = {"Authorization": f"token {_s.gitea_token}"}
try:
_resp = _httpx.get(_url, headers=_headers, timeout=10)
_prs = _resp.json()
_pr_number = None
for _pr in _prs:
if _pr.get("head", {}).get("ref") == branch:
_pr_number = _pr["number"]
break
if _pr_number:
passed, reason = qg_func(repo, _pr_number)
else:
# No open PR but review file exists — check file-based
import os
from ..git_worktree import get_worktree_path as _gwp
_wt = _gwp(repo, branch) if os.path.isdir(_gwp(repo, branch)) else os.path.join(_s.repos_dir, repo)
_review_path = os.path.join(_wt, f"docs/work-items/{work_item_id}/12-review.md")
_review_path2 = os.path.join(_wt, f"docs/work-items/{work_item_id}/09-review.md")
if os.path.isfile(_review_path) or os.path.isfile(_review_path2):
passed, reason = True, "Review file exists (file-based approval)"
else:
passed, reason = False, "No open PR found and no review file"
except Exception as _e:
passed, reason = False, f"Error finding PR: {_e}"
else:
passed, reason = False, f"Unknown QG: {qg_name}"
if not passed:
notify_qg_failure(task_id, current_stage, qg_name, reason)
plane_notify_qg(work_item_id, current_stage, qg_name, reason)
return
# Advance stage
update_task_stage(task_id, next_stage)
notify_stage_change(task_id, current_stage, next_stage)
plane_notify_stage(work_item_id, current_stage, next_stage)
# Launch agent associated with the current stage's transition
agent = get_agent_for_stage(current_stage)
if agent:
try:
task_desc = f"Work item: {work_item_id}\nRepo: {repo}\nBranch: {branch}\nStage: {next_stage}"
job_id = enqueue_job(agent, repo, task_desc, task_id=task_id)
plane_notify_stage(work_item_id, current_stage, next_stage, agent)
logger.info(f"Task {task_id}: enqueued agent '{agent}', job_id={job_id}")
except Exception as e:
notify_error(task_id, f"Failed to launch agent '{agent}': {e}")
logger.error(f"Agent launch failed: {e}")
await asyncio.to_thread(
advance_stage,
task_id,
current_stage,
repo,
work_item_id,
branch,
None,
)
async def _create_gitea_branch(repo: str, branch: str):

View File

@@ -7,6 +7,7 @@ Covers the audit-2026-06-02 fixes:
the YAML frontmatter only (no fragile substring matching).
"""
import os
import signal
import tempfile
import pytest
@@ -20,6 +21,7 @@ os.environ["ORCH_PLANE_API_TOKEN"] = "test-token"
from src.agents.launcher import AgentLauncher
from src.qg.checks import check_reviewer_verdict
from src.config import settings
# ---------------------------------------------------------------------------
@@ -138,3 +140,141 @@ class TestCheckReviewerVerdict:
passed, reason = check_reviewer_verdict("enduro-trails", "ET-999")
assert passed is False
assert "not found" in reason.lower()
# ---------------------------------------------------------------------------
# ORCH-7 (M-4): dead code removed
# ---------------------------------------------------------------------------
class TestDeadCodeRemoved:
"""M-4: _auto_merge_pr was never called (merge is the deployer's job) and is
removed. _ensure_pr (used by the auto-advance path) must stay."""
def test_auto_merge_pr_is_gone(self):
assert not hasattr(AgentLauncher, "_auto_merge_pr")
def test_ensure_pr_still_present(self):
assert hasattr(AgentLauncher, "_ensure_pr")
# ---------------------------------------------------------------------------
# ORCH-7 (M-2): configurable timeout + per-agent override
# ---------------------------------------------------------------------------
class TestResolveTimeout:
"""M-2: _resolve_timeout honours a per-agent JSON override, else the default."""
def test_default_when_no_override(self, monkeypatch):
monkeypatch.setattr(settings, "agent_timeout_seconds", 1800)
monkeypatch.setattr(settings, "agent_timeout_overrides_json", "")
assert AgentLauncher._resolve_timeout("developer") == 1800
assert AgentLauncher._resolve_timeout(None) == 1800
def test_override_for_specific_agent(self, monkeypatch):
monkeypatch.setattr(settings, "agent_timeout_seconds", 1800)
monkeypatch.setattr(
settings, "agent_timeout_overrides_json", '{"reviewer": 3600, "architect": 2700}'
)
assert AgentLauncher._resolve_timeout("reviewer") == 3600
assert AgentLauncher._resolve_timeout("architect") == 2700
# an agent not in the override map falls back to the default
assert AgentLauncher._resolve_timeout("developer") == 1800
def test_malformed_override_falls_back_to_default(self, monkeypatch):
monkeypatch.setattr(settings, "agent_timeout_seconds", 1800)
monkeypatch.setattr(settings, "agent_timeout_overrides_json", "{not-json")
# must not raise, must return the default
assert AgentLauncher._resolve_timeout("reviewer") == 1800
class TestWatchdogGracefulKill:
"""M-2: SIGTERM -> grace -> SIGKILL ordering, with graceful-exit short-circuit
and ProcessLookupError tolerance. The OS process is fully faked: we record the
signals sent and decide liveness from a script, so no real process is touched."""
def _patch_db(self, monkeypatch):
"""Stub get_db so _record_kill does not need a real DB."""
class _Conn:
def execute(self, *a, **k):
return self
def commit(self):
pass
def close(self):
pass
monkeypatch.setattr("src.agents.launcher.get_db", lambda: _Conn())
def test_sigterm_then_sigkill_after_grace(self, monkeypatch):
"""Process stays alive through the whole grace window -> SIGTERM then SIGKILL."""
self._patch_db(monkeypatch)
monkeypatch.setattr(settings, "agent_kill_grace_seconds", 1)
monkeypatch.setattr("src.agents.launcher.time.sleep", lambda s: None)
sent = []
def fake_kill(pid, sig):
sent.append(sig)
# signal 0 (liveness probe) -> always alive; never raise
return None
monkeypatch.setattr("src.agents.launcher.os.kill", fake_kill)
launcher = AgentLauncher()
launcher._watchdog(pid=4242, run_id=1, timeout=0, agent="developer")
assert signal.SIGTERM in sent
assert signal.SIGKILL in sent
# SIGTERM must come before SIGKILL
assert sent.index(signal.SIGTERM) < sent.index(signal.SIGKILL)
def test_graceful_exit_in_grace_skips_sigkill(self, monkeypatch):
"""Process dies during the grace window -> SIGKILL is NOT sent."""
self._patch_db(monkeypatch)
monkeypatch.setattr(settings, "agent_kill_grace_seconds", 5)
monkeypatch.setattr("src.agents.launcher.time.sleep", lambda s: None)
sent = []
state = {"alive": True, "probes": 0}
def fake_kill(pid, sig):
if sig == 0:
state["probes"] += 1
# die on the 2nd liveness probe (within grace)
if state["probes"] >= 2:
raise ProcessLookupError
return None
sent.append(sig)
return None
monkeypatch.setattr("src.agents.launcher.os.kill", fake_kill)
launcher = AgentLauncher()
launcher._watchdog(pid=4242, run_id=2, timeout=0, agent="developer")
assert signal.SIGTERM in sent
assert signal.SIGKILL not in sent
def test_already_dead_before_sigterm(self, monkeypatch):
"""Process already gone at SIGTERM -> ProcessLookupError tolerated, no SIGKILL,
and _record_kill is NOT called (the monitor's proc.wait owns the exit)."""
self._patch_db(monkeypatch)
monkeypatch.setattr("src.agents.launcher.time.sleep", lambda s: None)
sent = []
def fake_kill(pid, sig):
if sig == signal.SIGTERM:
raise ProcessLookupError
sent.append(sig)
return None
recorded = {"called": False}
monkeypatch.setattr(
AgentLauncher, "_record_kill",
staticmethod(lambda rid: recorded.__setitem__("called", True)),
)
monkeypatch.setattr("src.agents.launcher.os.kill", fake_kill)
launcher = AgentLauncher()
# must not raise
launcher._watchdog(pid=4242, run_id=3, timeout=0, agent="developer")
assert signal.SIGKILL not in sent
assert recorded["called"] is False

View File

@@ -0,0 +1,92 @@
"""L-2: tests for prune_run_logs (run-log rotation).
Verifies that old / surplus *.log files are removed while fresh logs, non-.log
files, the active log, and subdirectories are left intact. Function is
best-effort and must never raise.
"""
import os
import time
from src.agents.launcher import prune_run_logs
def _touch(path, age_days=0):
with open(path, "w") as f:
f.write("x")
mtime = time.time() - age_days * 86400
os.utime(path, (mtime, mtime))
return path
def test_old_logs_removed_fresh_kept(tmp_path):
runs = tmp_path
fresh = _touch(str(runs / "1.log"), age_days=1)
old = _touch(str(runs / "2.log"), age_days=40)
removed = prune_run_logs(str(runs), keep_days=30, keep_max=500)
assert removed == 1
assert os.path.exists(fresh)
assert not os.path.exists(old)
def test_non_log_files_untouched(tmp_path):
runs = tmp_path
old_log = _touch(str(runs / "stale.log"), age_days=99)
keep_txt = _touch(str(runs / "notes.txt"), age_days=99)
keep_db = _touch(str(runs / "orchestrator.db"), age_days=99)
prune_run_logs(str(runs), keep_days=30, keep_max=500)
assert not os.path.exists(old_log)
assert os.path.exists(keep_txt)
assert os.path.exists(keep_db)
def test_keep_max_retains_newest(tmp_path):
runs = tmp_path
# 5 logs, all recent (within keep_days), increasing age 0..4 days.
paths = []
for i in range(5):
paths.append(_touch(str(runs / f"{i}.log"), age_days=i))
removed = prune_run_logs(str(runs), keep_days=365, keep_max=2)
# Only the 2 newest (age 0, 1) survive.
assert removed == 3
assert os.path.exists(paths[0])
assert os.path.exists(paths[1])
for p in paths[2:]:
assert not os.path.exists(p)
def test_active_log_never_removed(tmp_path):
runs = tmp_path
active = _touch(str(runs / "active.log"), age_days=99)
other = _touch(str(runs / "other.log"), age_days=99)
removed = prune_run_logs(
str(runs), keep_days=30, keep_max=500, active_paths=[active]
)
assert removed == 1
assert os.path.exists(active)
assert not os.path.exists(other)
def test_subdirs_untouched(tmp_path):
runs = tmp_path
sub = runs / "sub.log"
sub.mkdir() # a directory that happens to end in .log
old_log = _touch(str(runs / "old.log"), age_days=99)
prune_run_logs(str(runs), keep_days=30, keep_max=500)
assert sub.is_dir()
assert not os.path.exists(old_log)
def test_missing_dir_is_noop(tmp_path):
missing = tmp_path / "does-not-exist"
# Must not raise.
assert prune_run_logs(str(missing)) == 0

181
tests/test_m6_sequence.py Normal file
View File

@@ -0,0 +1,181 @@
"""M-6: work_item_id derived from Plane sequence_id (source of truth = Plane).
Covers:
* fetch_issue_sequence_id returns int on a valid Plane response (mocked httpx);
* returns None on network error / missing field WITHOUT raising;
* handle_work_item_created uses prefix-NNN when seq is available, and falls
back to get_next_work_item_id when seq is None (Plane down => autonomy);
* find_issue_id no longer hardcodes 'ET-' and matches an arbitrary prefix
(e.g. ORCH-005) by sequence_id.
"""
import os
import tempfile
import pytest
_test_db = os.path.join(tempfile.gettempdir(), "test_orchestrator_m6.db")
os.environ["ORCH_DB_PATH"] = _test_db
os.environ.setdefault("ORCH_PLANE_WEBHOOK_SECRET", "")
os.environ.setdefault("ORCH_GITEA_WEBHOOK_SECRET", "")
os.environ.setdefault("ORCH_GITEA_TOKEN", "test-token")
os.environ.setdefault("ORCH_PLANE_API_TOKEN", "test-token")
from unittest.mock import patch, AsyncMock, MagicMock # noqa: E402
from fastapi.testclient import TestClient # noqa: E402
from src.main import app # noqa: E402
from src.db import init_db, get_db # noqa: E402
from src import projects as P # noqa: E402
from src.projects import reload_projects # noqa: E402
import src.plane_sync as plane_sync # noqa: E402
ORCH_PLANE_ID = "8da6aa25-a60e-44d6-a1e2-d8ae59aa7d6a"
ENDURO_PLANE_ID = "7a79f0a9-5278-49cd-9007-9a338f238f9c"
client = TestClient(app)
@pytest.fixture(autouse=True)
def setup(monkeypatch):
monkeypatch.setattr(P.settings, "db_path", _test_db)
import src.db as _db
monkeypatch.setattr(_db.settings, "db_path", _test_db)
if os.path.exists(_test_db):
os.unlink(_test_db)
init_db()
monkeypatch.setattr("src.webhooks.plane.verify_plane_signature", lambda body, sig: True)
registry_json = (
f'[{{"plane_project_id": "{ENDURO_PLANE_ID}", "repo": "enduro-trails",'
f' "work_item_prefix": "ET", "name": "enduro-trails"}},'
f' {{"plane_project_id": "{ORCH_PLANE_ID}", "repo": "orchestrator",'
f' "work_item_prefix": "ORCH", "name": "orchestrator"}}]'
)
monkeypatch.setattr(P.settings, "projects_json", registry_json)
reload_projects()
yield
reload_projects()
if os.path.exists(_test_db):
os.unlink(_test_db)
def _mock_resp(json_body, status=200):
m = MagicMock()
m.json.return_value = json_body
m.raise_for_status.return_value = None
if status >= 400:
def _raise():
raise RuntimeError(f"HTTP {status}")
m.raise_for_status.side_effect = _raise
return m
# ---------------------------------------------------------------------------
# fetch_issue_sequence_id
# ---------------------------------------------------------------------------
def test_fetch_sequence_id_returns_int():
with patch.object(plane_sync.httpx, "get", return_value=_mock_resp({"sequence_id": 42})):
seq = plane_sync.fetch_issue_sequence_id("issue-uuid", "proj-uuid")
assert seq == 42
assert isinstance(seq, int)
def test_fetch_sequence_id_network_error_returns_none():
with patch.object(plane_sync.httpx, "get", side_effect=RuntimeError("connection refused")):
seq = plane_sync.fetch_issue_sequence_id("issue-uuid", "proj-uuid")
assert seq is None # must not raise
def test_fetch_sequence_id_missing_field_returns_none():
with patch.object(plane_sync.httpx, "get", return_value=_mock_resp({"error": "not found"})):
seq = plane_sync.fetch_issue_sequence_id("missing-uuid", "proj-uuid")
assert seq is None
# ---------------------------------------------------------------------------
# handle_work_item_created: seq available -> prefix-NNN
# ---------------------------------------------------------------------------
def _post(plane_id, plane_project_id=ORCH_PLANE_ID, name="A valid work item title"):
return client.post(
"/webhook/plane",
json={
"event": "work_item.created",
"data": {
"id": plane_id,
"name": name,
"description_stripped": "This is a sufficiently long description.",
"project": plane_project_id,
},
},
)
@patch("src.webhooks.plane.launcher")
@patch("src.webhooks.plane._create_initial_docs", new_callable=AsyncMock)
@patch("src.webhooks.plane._create_gitea_branch", new_callable=AsyncMock)
@patch("src.plane_sync.fetch_issue_sequence_id", return_value=7)
def test_created_uses_plane_sequence_id(mock_fetch, mock_branch, mock_docs, mock_launcher):
mock_launcher.launch.return_value = 1
resp = _post("seq-issue")
assert resp.status_code == 200
conn = get_db()
task = conn.execute("SELECT work_item_id FROM tasks WHERE plane_id='seq-issue'").fetchone()
conn.close()
assert task is not None
assert task["work_item_id"] == "ORCH-007"
mock_fetch.assert_called_once()
@patch("src.webhooks.plane.launcher")
@patch("src.webhooks.plane._create_initial_docs", new_callable=AsyncMock)
@patch("src.webhooks.plane._create_gitea_branch", new_callable=AsyncMock)
@patch("src.plane_sync.fetch_issue_sequence_id", return_value=None)
@patch("src.webhooks.plane.get_next_work_item_id", return_value="ORCH-099")
def test_created_falls_back_to_db_when_plane_down(
mock_next, mock_fetch, mock_branch, mock_docs, mock_launcher
):
"""Plane unavailable (seq=None) => fall back to DB increment; task still created."""
mock_launcher.launch.return_value = 1
resp = _post("fallback-issue")
assert resp.status_code == 200
conn = get_db()
task = conn.execute("SELECT work_item_id FROM tasks WHERE plane_id='fallback-issue'").fetchone()
conn.close()
assert task is not None # autonomy: Plane down does not block creation
assert task["work_item_id"] == "ORCH-099"
mock_next.assert_called_once()
# ---------------------------------------------------------------------------
# find_issue_id: no hardcoded ET- prefix, matches arbitrary prefix by seq
# ---------------------------------------------------------------------------
def test_find_issue_id_matches_arbitrary_prefix_by_sequence():
"""ORCH-005 must resolve via the issue whose sequence_id == 5 (no ET- assumption)."""
issues = {"results": [
{"id": "uuid-a", "sequence_id": 3, "name": "something"},
{"id": "uuid-b", "sequence_id": 5, "name": "ORCH-005: target"},
{"id": "uuid-c", "sequence_id": 9, "name": "other"},
]}
# No DB row for this work_item_id => goes to the Plane API search branch.
with patch.object(plane_sync.httpx, "get", return_value=_mock_resp(issues)):
found = plane_sync.find_issue_id("ORCH-005", project_id="proj-uuid")
assert found == "uuid-b"
def test_find_issue_id_matches_et_prefix_too():
"""Backward compat: ET-002 still resolves by sequence_id == 2."""
issues = {"results": [
{"id": "uuid-x", "sequence_id": 2, "name": "ET item"},
{"id": "uuid-y", "sequence_id": 7, "name": "other"},
]}
with patch.object(plane_sync.httpx, "get", return_value=_mock_resp(issues)):
found = plane_sync.find_issue_id("ET-002", project_id="proj-uuid")
assert found == "uuid-x"

View File

@@ -0,0 +1,99 @@
"""Tests for per-agent Plane comment authorship (feat: per-agent bot author).
Covers:
* _headers_for: role -> bot token; None/unknown/empty token -> shared fallback.
* add_comment: author is propagated into the POST headers; no author keeps
backward-compatible behaviour (shared orchestrator token).
GET/PATCH calls are intentionally NOT covered here: they stay on the shared
token by design and are unchanged by this feature.
"""
import os
# Set env defaults before importing app modules (same convention as the other
# suites) so config/settings load cleanly without a real .env.
os.environ.setdefault("ORCH_PLANE_API_TOKEN", "shared-token")
os.environ.setdefault("ORCH_GITEA_TOKEN", "test-token")
from unittest.mock import patch, MagicMock # noqa: E402
from src import plane_sync # noqa: E402
# --------------------------------------------------------------------------- #
# _headers_for
# --------------------------------------------------------------------------- #
def test_headers_for_known_role_uses_bot_token():
"""A known role with a configured token -> that bot's X-API-Key."""
with patch.dict(plane_sync.PLANE_BOT_TOKENS, {"analyst": "analyst-tok"}, clear=False):
assert plane_sync._headers_for("analyst") == {"X-API-Key": "analyst-tok"}
def test_headers_for_none_falls_back_to_shared():
"""author=None -> shared orchestrator headers."""
assert plane_sync._headers_for(None) is plane_sync.PLANE_HEADERS
def test_headers_for_unknown_role_falls_back_to_shared():
"""Unknown role -> shared orchestrator headers."""
assert plane_sync._headers_for("nope") is plane_sync.PLANE_HEADERS
def test_headers_for_empty_token_falls_back_to_shared():
"""Known role but empty/unconfigured token -> shared orchestrator headers."""
with patch.dict(plane_sync.PLANE_BOT_TOKENS, {"tester": ""}, clear=False):
assert plane_sync._headers_for("tester") is plane_sync.PLANE_HEADERS
def test_headers_for_empty_string_author_falls_back_to_shared():
"""author='' -> shared orchestrator headers."""
assert plane_sync._headers_for("") is plane_sync.PLANE_HEADERS
# --------------------------------------------------------------------------- #
# add_comment
# --------------------------------------------------------------------------- #
def _mock_post_ok():
resp = MagicMock()
resp.raise_for_status.return_value = None
return resp
def test_add_comment_with_author_posts_with_bot_headers():
"""add_comment(author='developer') -> httpx.post called with the developer
bot's X-API-Key header."""
with patch.object(plane_sync, "find_issue_id", return_value="issue-uuid"), \
patch.object(plane_sync, "_resolve_project_id", return_value="proj-uuid"), \
patch.dict(plane_sync.PLANE_BOT_TOKENS, {"developer": "dev-tok"}, clear=False), \
patch.object(plane_sync.httpx, "post", return_value=_mock_post_ok()) as mock_post:
plane_sync.add_comment("ET-001", "hello", author="developer")
assert mock_post.called
_, kwargs = mock_post.call_args
assert kwargs["headers"] == {"X-API-Key": "dev-tok"}
def test_add_comment_without_author_uses_shared_token():
"""add_comment without author -> shared orchestrator headers (backward
compatible)."""
with patch.object(plane_sync, "find_issue_id", return_value="issue-uuid"), \
patch.object(plane_sync, "_resolve_project_id", return_value="proj-uuid"), \
patch.object(plane_sync.httpx, "post", return_value=_mock_post_ok()) as mock_post:
plane_sync.add_comment("ET-001", "hello")
assert mock_post.called
_, kwargs = mock_post.call_args
assert kwargs["headers"] is plane_sync.PLANE_HEADERS
def test_add_comment_unknown_author_uses_shared_token():
"""add_comment with an unknown role -> shared orchestrator headers."""
with patch.object(plane_sync, "find_issue_id", return_value="issue-uuid"), \
patch.object(plane_sync, "_resolve_project_id", return_value="proj-uuid"), \
patch.object(plane_sync.httpx, "post", return_value=_mock_post_ok()) as mock_post:
plane_sync.add_comment("ET-001", "hello", author="ghost")
assert mock_post.called
_, kwargs = mock_post.call_args
assert kwargs["headers"] is plane_sync.PLANE_HEADERS

395
tests/test_stage_engine.py Normal file
View File

@@ -0,0 +1,395 @@
"""ORCH-4 / M-3: tests for the unified stage engine (src/stage_engine.advance_stage).
These verify the MERGED behavior of what used to be two diverged
_try_advance_stage implementations (launcher sync + plane async):
* happy-path advance for every stage launches the CORRECT agent
(the ORCH-4 fix: agent = get_agent_for_stage(current_stage), NOT next_stage);
* a QG failure does not advance;
* reviewer REQUEST_CHANGES -> rollback to development + enqueue developer;
* developer retries > 3 -> telegram alert, no further enqueue;
* tester FAIL -> rollback to development + enqueue developer;
* architect conflict (10-conflict.md) -> rollback to analysis + enqueue analyst;
* launcher AND plane both delegate to the engine.
Network/Plane/Telegram side effects are mocked at the src.stage_engine level so
the engine runs against a real isolated sqlite DB.
"""
import os
import tempfile
import pytest
# Isolated test DB (same convention as the other suites).
_test_db = os.path.join(tempfile.gettempdir(), "test_orchestrator_stage_engine.db")
os.environ["ORCH_DB_PATH"] = _test_db
os.environ["ORCH_REPOS_DIR"] = tempfile.gettempdir()
os.environ.setdefault("ORCH_GITEA_TOKEN", "test-token")
os.environ.setdefault("ORCH_PLANE_API_TOKEN", "test-token")
from unittest.mock import MagicMock, patch # noqa: E402
import src.db as _db # noqa: E402
from src.db import init_db, get_db # noqa: E402
from src import stage_engine # noqa: E402
from src.stage_engine import advance_stage # noqa: E402
from src.stages import get_agent_for_stage # noqa: E402
# ---------------------------------------------------------------------------
# Fixtures
# ---------------------------------------------------------------------------
@pytest.fixture(autouse=True)
def fresh_db(monkeypatch):
"""Fresh isolated DB per test."""
monkeypatch.setattr(_db.settings, "db_path", _test_db)
if os.path.exists(_test_db):
os.unlink(_test_db)
init_db()
yield
@pytest.fixture(autouse=True)
def silence_side_effects(monkeypatch):
"""Mock all Plane/Telegram/notification side effects in the engine.
Everything imported into src.stage_engine that touches the network or sends
a message becomes a no-op MagicMock so tests are deterministic and offline.
"""
for name in (
"notify_stage_change",
"notify_qg_failure",
"notify_approve_requested",
"send_telegram",
"plane_notify_stage",
"plane_notify_qg",
"plane_add_comment",
"set_issue_in_review",
"set_issue_needs_input",
"set_issue_in_progress",
"set_issue_blocked",
):
monkeypatch.setattr(stage_engine, name, MagicMock())
def _make_task(stage, repo="enduro-trails", branch="feature/ET-001-x", wi="ET-001"):
conn = get_db()
cur = conn.execute(
"INSERT INTO tasks (plane_id, work_item_id, repo, branch, stage) "
"VALUES (?, ?, ?, ?, ?)",
(f"plane-{wi}", wi, repo, branch, stage),
)
task_id = cur.lastrowid
conn.commit()
conn.close()
return task_id
def _stage(task_id):
conn = get_db()
row = conn.execute("SELECT stage FROM tasks WHERE id=?", (task_id,)).fetchone()
conn.close()
return row[0]
def _jobs():
conn = get_db()
rows = conn.execute("SELECT agent, repo, task_id FROM jobs ORDER BY id").fetchall()
conn.close()
return [dict(r) for r in rows]
def _add_developer_runs(task_id, n):
conn = get_db()
for _ in range(n):
conn.execute(
"INSERT INTO agent_runs (task_id, agent) VALUES (?, 'developer')",
(task_id,),
)
conn.commit()
conn.close()
def _pass(*a, **k):
return (True, "ok")
def _fail(reason):
def _f(*a, **k):
return (False, reason)
return _f
# ---------------------------------------------------------------------------
# Happy path: each stage advances and launches the CORRECT agent (ORCH-4 fix)
# ---------------------------------------------------------------------------
class TestHappyPathAgentSelection:
"""The fixed agent-selection: when advancing FROM current_stage, the engine
must enqueue get_agent_for_stage(current_stage), NOT next_stage.
"""
@pytest.mark.parametrize(
"current_stage,expected_next,expected_agent",
[
("architecture", "development", "developer"),
("development", "review", "reviewer"),
("review", "testing", "tester"),
("testing", "deploy", "deployer"),
],
)
def test_advance_launches_current_stage_agent(
self, monkeypatch, current_stage, expected_next, expected_agent
):
# All QG checks pass for this happy-path suite.
monkeypatch.setattr(
stage_engine, "QG_CHECKS",
{k: _pass for k in stage_engine.QG_CHECKS},
)
task_id = _make_task(current_stage)
res = advance_stage(
task_id, current_stage, "enduro-trails", "ET-001",
"feature/ET-001-x", finished_agent=None,
)
assert res.advanced is True
assert res.to_stage == expected_next
assert _stage(task_id) == expected_next
# The ORCH-4 fix: correct agent == get_agent_for_stage(current_stage).
assert expected_agent == get_agent_for_stage(current_stage)
assert res.enqueued_agent == expected_agent
jobs = _jobs()
assert len(jobs) == 1
assert jobs[0]["agent"] == expected_agent
def test_deploy_to_done_no_agent(self, monkeypatch):
"""deploy -> done advances but launches no agent (terminal-ish)."""
monkeypatch.setattr(
stage_engine, "QG_CHECKS",
{k: _pass for k in stage_engine.QG_CHECKS},
)
task_id = _make_task("deploy")
res = advance_stage(task_id, "deploy", "enduro-trails", "ET-001",
"feature/ET-001-x", finished_agent=None)
assert res.advanced is True
assert _stage(task_id) == "done"
assert res.enqueued_agent is None
assert _jobs() == []
def test_done_is_terminal(self):
task_id = _make_task("done")
res = advance_stage(task_id, "done", "enduro-trails", "ET-001",
"feature/ET-001-x", finished_agent=None)
assert res.advanced is False
assert _stage(task_id) == "done"
# ---------------------------------------------------------------------------
# QG failure: do not advance
# ---------------------------------------------------------------------------
class TestQgFailureDoesNotAdvance:
def test_qg_fail_keeps_stage(self, monkeypatch):
monkeypatch.setattr(
stage_engine, "QG_CHECKS",
{**stage_engine.QG_CHECKS, "check_architecture_done": _fail("not done")},
)
task_id = _make_task("architecture")
res = advance_stage(task_id, "architecture", "enduro-trails", "ET-001",
"feature/ET-001-x", finished_agent="architect")
assert res.advanced is False
assert res.qg_passed is False
assert _stage(task_id) == "architecture"
assert _jobs() == []
def test_webhook_path_emits_qg_failure_notification(self, monkeypatch):
"""finished_agent=None -> generic QG-failure notification fires (plane parity)."""
monkeypatch.setattr(
stage_engine, "QG_CHECKS",
{**stage_engine.QG_CHECKS, "check_tests_local": _fail("ci red")},
)
task_id = _make_task("development")
advance_stage(task_id, "development", "enduro-trails", "ET-001",
"feature/ET-001-x", finished_agent=None)
assert stage_engine.notify_qg_failure.called
assert stage_engine.plane_notify_qg.called
def test_launcher_path_no_generic_qg_notification(self, monkeypatch):
"""finished_agent set -> NO generic QG notification (launcher parity)."""
monkeypatch.setattr(
stage_engine, "QG_CHECKS",
{**stage_engine.QG_CHECKS, "check_architecture_done": _fail("not done")},
)
task_id = _make_task("architecture")
advance_stage(task_id, "architecture", "enduro-trails", "ET-001",
"feature/ET-001-x", finished_agent="architect")
assert not stage_engine.notify_qg_failure.called
# ---------------------------------------------------------------------------
# Reviewer REQUEST_CHANGES -> rollback to development + enqueue developer
# ---------------------------------------------------------------------------
class TestReviewerRequestChanges:
def test_rollback_and_enqueue_developer(self, monkeypatch):
monkeypatch.setattr(
stage_engine, "QG_CHECKS",
{**stage_engine.QG_CHECKS,
"check_reviewer_verdict": _fail("verdict: REQUEST_CHANGES")},
)
task_id = _make_task("review")
res = advance_stage(task_id, "review", "enduro-trails", "ET-001",
"feature/ET-001-x", finished_agent="reviewer")
assert res.advanced is False
assert res.rolled_back_to == "development"
assert _stage(task_id) == "development"
jobs = _jobs()
assert len(jobs) == 1
assert jobs[0]["agent"] == "developer"
def test_retry_over_3_alerts_no_enqueue(self, monkeypatch):
monkeypatch.setattr(
stage_engine, "QG_CHECKS",
{**stage_engine.QG_CHECKS,
"check_reviewer_verdict": _fail("verdict: REQUEST_CHANGES")},
)
task_id = _make_task("review")
_add_developer_runs(task_id, 3) # already at the max
res = advance_stage(task_id, "review", "enduro-trails", "ET-001",
"feature/ET-001-x", finished_agent="reviewer")
assert res.rolled_back_to == "development"
assert res.alerted is True
assert stage_engine.send_telegram.called
# No new developer job enqueued past the retry cap.
assert _jobs() == []
# ---------------------------------------------------------------------------
# Tester FAIL -> rollback to development + enqueue developer
# ---------------------------------------------------------------------------
class TestTesterFail:
def test_rollback_and_enqueue_developer(self, monkeypatch):
monkeypatch.setattr(
stage_engine, "QG_CHECKS",
{**stage_engine.QG_CHECKS, "check_tests_passed": _fail("2 tests failed")},
)
task_id = _make_task("testing")
res = advance_stage(task_id, "testing", "enduro-trails", "ET-001",
"feature/ET-001-x", finished_agent="tester")
assert res.advanced is False
assert res.rolled_back_to == "development"
assert _stage(task_id) == "development"
jobs = _jobs()
assert len(jobs) == 1
assert jobs[0]["agent"] == "developer"
def test_retry_over_3_blocks_and_alerts(self, monkeypatch):
monkeypatch.setattr(
stage_engine, "QG_CHECKS",
{**stage_engine.QG_CHECKS, "check_tests_passed": _fail("still failing")},
)
task_id = _make_task("testing")
_add_developer_runs(task_id, 3)
res = advance_stage(task_id, "testing", "enduro-trails", "ET-001",
"feature/ET-001-x", finished_agent="tester")
assert res.rolled_back_to == "development"
assert res.alerted is True
assert stage_engine.set_issue_blocked.called
assert _jobs() == []
# ---------------------------------------------------------------------------
# Architect conflict -> rollback to analysis + enqueue analyst
# ---------------------------------------------------------------------------
class TestArchitectConflict:
def test_conflict_rolls_back_to_analysis(self, monkeypatch, tmp_path):
# 10-conflict.md must exist in the worktree path the engine inspects.
wt = tmp_path / "wt"
conflict_dir = wt / "docs" / "work-items" / "ET-001"
conflict_dir.mkdir(parents=True)
(conflict_dir / "10-conflict.md").write_text("conflict with TRZ")
monkeypatch.setattr(stage_engine, "get_worktree_path", lambda repo, branch: str(wt))
monkeypatch.setattr(
stage_engine, "QG_CHECKS",
{**stage_engine.QG_CHECKS, "check_architecture_done": _fail("conflict")},
)
task_id = _make_task("architecture")
res = advance_stage(task_id, "architecture", "enduro-trails", "ET-001",
"feature/ET-001-x", finished_agent="architect")
assert res.advanced is False
assert res.rolled_back_to == "analysis"
assert _stage(task_id) == "analysis"
jobs = _jobs()
assert len(jobs) == 1
assert jobs[0]["agent"] == "analyst"
def test_no_conflict_file_no_rollback(self, monkeypatch, tmp_path):
wt = tmp_path / "wt"
(wt / "docs").mkdir(parents=True)
monkeypatch.setattr(stage_engine, "get_worktree_path", lambda repo, branch: str(wt))
monkeypatch.setattr(
stage_engine, "QG_CHECKS",
{**stage_engine.QG_CHECKS, "check_architecture_done": _fail("incomplete")},
)
task_id = _make_task("architecture")
res = advance_stage(task_id, "architecture", "enduro-trails", "ET-001",
"feature/ET-001-x", finished_agent="architect")
assert res.advanced is False
assert res.rolled_back_to is None
assert _stage(task_id) == "architecture"
assert _jobs() == []
# ---------------------------------------------------------------------------
# Analyst approved-flow (analysis gate): never auto-advances
# ---------------------------------------------------------------------------
class TestAnalysisApprovedFlow:
def test_artifacts_ready_requests_approval_no_advance(self, monkeypatch):
monkeypatch.setattr(
stage_engine, "QG_CHECKS",
{**stage_engine.QG_CHECKS, "check_analysis_complete": _pass},
)
task_id = _make_task("analysis")
res = advance_stage(task_id, "analysis", "enduro-trails", "ET-001",
"feature/ET-001-x", finished_agent="analyst")
assert res.advanced is False
assert _stage(task_id) == "analysis"
assert stage_engine.set_issue_in_review.called
assert stage_engine.notify_approve_requested.called
assert _jobs() == []
# ---------------------------------------------------------------------------
# launcher + plane both delegate to the engine
# ---------------------------------------------------------------------------
class TestDelegation:
def test_launcher_calls_engine(self):
from src.agents.launcher import AgentLauncher
task_id = _make_task("development", branch="feature/ET-777-deleg")
with patch("src.stage_engine.advance_stage") as m:
AgentLauncher()._try_advance_stage(
run_id=1, agent="developer", repo="enduro-trails",
branch="feature/ET-777-deleg",
)
m.assert_called_once()
kwargs = m.call_args.kwargs
assert kwargs["task_id"] == task_id
assert kwargs["current_stage"] == "development"
assert kwargs["finished_agent"] == "developer"
def test_plane_calls_engine(self):
import asyncio
from src.webhooks import plane as plane_mod
with patch("src.stage_engine.advance_stage") as m:
asyncio.run(
plane_mod._try_advance_stage(
task_id=5, current_stage="analysis", repo="enduro-trails",
work_item_id="ET-001", branch="feature/ET-001-x",
)
)
m.assert_called_once()
# plane passes positional args; finished_agent (last positional) is None.
args = m.call_args.args
assert args[0] == 5
assert args[1] == "analysis"
assert args[-1] is None

277
tests/test_webhook_dedup.py Normal file
View File

@@ -0,0 +1,277 @@
"""ORCH-5 (M-7): webhook delivery de-duplication tests.
A retried/replayed webhook delivery must be processed exactly once. We mock
enqueue_job (imported into the gitea/plane module namespaces) and assert its
call_count does not grow on a repeat. HMAC is bypassed here by forcing the
webhook secrets empty (the 9 pre-existing 401 webhook tests are a separate
baseline and are NOT touched). A dedicated test keeps the 401-on-bad-signature
guarantee by re-enabling the secret.
"""
import os
import tempfile
from unittest.mock import patch, AsyncMock
import pytest
# Override DB path + project registry BEFORE importing app (same pattern as
# tests/test_webhooks.py).
_test_db = os.path.join(tempfile.gettempdir(), "test_orchestrator_dedup.db")
os.environ["ORCH_DB_PATH"] = _test_db
os.environ["ORCH_REPOS_DIR"] = tempfile.gettempdir()
os.environ["ORCH_GITEA_TOKEN"] = "test-token"
os.environ["ORCH_PLANE_API_TOKEN"] = "test-token"
os.environ["ORCH_GITEA_OWNER"] = "admin"
os.environ["ORCH_DEFAULT_REPO"] = "enduro-trails"
os.environ["ORCH_PROJECTS_JSON"] = (
'[{"plane_project_id": "proj-1", "repo": "enduro-trails", '
'"work_item_prefix": "ET", "name": "enduro-trails"}]'
)
from fastapi.testclient import TestClient # noqa: E402
from src.main import app # noqa: E402
from src.db import init_db, get_db # noqa: E402
from src import db as db_module # noqa: E402
from src.webhooks import gitea as gitea_mod # noqa: E402
from src.webhooks import plane as plane_mod # noqa: E402
from src import projects as projects_mod # noqa: E402
@pytest.fixture(autouse=True)
def setup_db(monkeypatch):
# settings is a process-wide singleton; another test module may have fixed
# settings.db_path to its own file at import time. get_db() reads it live, so
# pin it to OUR db for the duration of each test here.
monkeypatch.setattr(db_module.settings, "db_path", _test_db, raising=False)
if os.path.exists(_test_db):
os.unlink(_test_db)
init_db()
yield
if os.path.exists(_test_db):
os.unlink(_test_db)
@pytest.fixture(autouse=True)
def proj_registry():
"""Pin the shared project registry to proj-1/enduro-trails.
The registry (projects.PROJECTS / _BY_PLANE_ID) is a process-wide singleton
built at import; test_projects.py rebuilds it via reload_projects(), which can
leave it on the built-in default where proj-1 is unknown -> ORCH-6 would
ignore our fixtures. Force ours for each test, then rebuild after.
"""
os.environ["ORCH_PROJECTS_JSON"] = (
'[{"plane_project_id": "proj-1", "repo": "enduro-trails", '
'"work_item_prefix": "ET", "name": "enduro-trails"}]'
)
projects_mod.settings.projects_json = os.environ["ORCH_PROJECTS_JSON"]
projects_mod.reload_projects()
yield
projects_mod.reload_projects()
@pytest.fixture(autouse=True)
def no_hmac(monkeypatch):
"""Bypass HMAC so dedup behavior (not signing) is under test.
settings is shared, so override the secret on the module-level settings that
each verify_* function reads.
"""
monkeypatch.setattr(gitea_mod.settings, "gitea_webhook_secret", "", raising=False)
monkeypatch.setattr(plane_mod.settings, "plane_webhook_secret", "", raising=False)
yield
client = TestClient(app)
def _events_count():
conn = get_db()
n = conn.execute("SELECT COUNT(*) FROM events").fetchone()[0]
conn.close()
return n
# ---------------------------------------------------------------------------
# Migration
# ---------------------------------------------------------------------------
def test_migration_adds_delivery_id_and_index():
"""events has delivery_id + a partial unique index idx_events_delivery."""
conn = get_db()
cols = [r[1] for r in conn.execute("PRAGMA table_info(events)").fetchall()]
idxs = [r[1] for r in conn.execute("PRAGMA index_list(events)").fetchall()]
conn.close()
assert "delivery_id" in cols
assert "idx_events_delivery" in idxs
def test_migration_on_old_db_without_column_does_not_crash():
"""init_db() over a pre-existing events table WITHOUT delivery_id is safe."""
if os.path.exists(_test_db):
os.unlink(_test_db)
import sqlite3
conn = sqlite3.connect(_test_db)
# Old-shape events table (no delivery_id) + a legacy row with NULL delivery_id.
conn.executescript(
"""
CREATE TABLE events (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp TEXT DEFAULT (datetime('now')),
source TEXT NOT NULL,
event_type TEXT NOT NULL,
payload TEXT NOT NULL,
processed INTEGER DEFAULT 0
);
INSERT INTO events (source, event_type, payload) VALUES ('plane','old','{}');
INSERT INTO events (source, event_type, payload) VALUES ('gitea','old2','{}');
"""
)
conn.commit()
conn.close()
# Should add the column + index without raising and keep the legacy rows.
init_db()
conn = get_db()
cols = [r[1] for r in conn.execute("PRAGMA table_info(events)").fetchall()]
n = conn.execute("SELECT COUNT(*) FROM events").fetchone()[0]
conn.close()
assert "delivery_id" in cols
assert n == 2 # legacy NULL-delivery rows preserved, partial index lets them coexist
# ---------------------------------------------------------------------------
# Gitea dedup
# ---------------------------------------------------------------------------
@patch.object(gitea_mod, "enqueue_job")
def test_gitea_duplicate_delivery_id_skips_dispatch(mock_enqueue):
"""Repeated X-Gitea-Delivery -> first processed, second {"status":"duplicate"}."""
# Task at architecture so the ADR push would enqueue.
conn = get_db()
conn.execute(
"INSERT INTO tasks (plane_id, work_item_id, repo, branch, stage) "
"VALUES (?, ?, ?, ?, ?)",
("gd-001", "ET-100", "enduro-trails", "feature/ET-100-x", "architecture"),
)
conn.commit()
conn.close()
body = {
"ref": "refs/heads/feature/ET-100-x",
"repository": {"name": "enduro-trails"},
"commits": [
{"added": ["docs/work-items/ET-100/06-adr/001-d.md"], "modified": []}
],
}
hdrs = {"X-Gitea-Event": "push", "X-Gitea-Delivery": "guid-AAA"}
r1 = client.post("/webhook/gitea", json=body, headers=hdrs)
assert r1.status_code == 200
assert r1.json()["status"] == "accepted"
assert mock_enqueue.call_count == 1
assert _events_count() == 1
# Same delivery id again -> duplicate, no new enqueue, no new event row.
r2 = client.post("/webhook/gitea", json=body, headers=hdrs)
assert r2.status_code == 200
assert r2.json()["status"] == "duplicate"
assert mock_enqueue.call_count == 1
assert _events_count() == 1
@patch.object(gitea_mod, "enqueue_job")
def test_gitea_two_distinct_delivery_ids_both_processed(mock_enqueue):
body = {"ref": "refs/heads/feature/none", "repository": {"name": "enduro-trails"}, "commits": []}
r1 = client.post("/webhook/gitea", json=body,
headers={"X-Gitea-Event": "push", "X-Gitea-Delivery": "guid-1"})
r2 = client.post("/webhook/gitea", json=body,
headers={"X-Gitea-Event": "push", "X-Gitea-Delivery": "guid-2"})
assert r1.json()["status"] == "accepted"
assert r2.json()["status"] == "accepted"
assert _events_count() == 2
def test_gitea_fallback_hash_when_no_delivery_header():
"""No X-Gitea-Delivery -> sha256 fallback; identical body repeat = duplicate."""
body = {"ref": "refs/heads/feature/none", "repository": {"name": "enduro-trails"}, "commits": []}
r1 = client.post("/webhook/gitea", json=body, headers={"X-Gitea-Event": "push"})
r2 = client.post("/webhook/gitea", json=body, headers={"X-Gitea-Event": "push"})
assert r1.json()["status"] == "accepted"
assert r2.json()["status"] == "duplicate"
assert _events_count() == 1
# ---------------------------------------------------------------------------
# Plane dedup
# ---------------------------------------------------------------------------
@patch.object(plane_mod, "enqueue_job")
@patch("src.webhooks.plane._create_gitea_branch", new_callable=AsyncMock)
@patch("src.webhooks.plane._create_initial_docs", new_callable=AsyncMock)
def test_plane_fallback_hash_dedup(mock_docs, mock_branch, mock_enqueue):
"""Repeated identical Plane body -> first accepted+enqueue, repeat duplicate."""
body = {
"event": "work_item.created",
"data": {
"id": "pd-001",
"name": "Dedup plane task",
"description_stripped": "A sufficiently long description for QG-0 to pass.",
"project": "proj-1",
},
}
r1 = client.post("/webhook/plane", json=body)
assert r1.status_code == 200
assert r1.json()["status"] == "accepted"
assert mock_enqueue.call_count == 1
assert _events_count() == 1
r2 = client.post("/webhook/plane", json=body)
assert r2.status_code == 200
assert r2.json()["status"] == "duplicate"
assert mock_enqueue.call_count == 1 # not re-enqueued
assert _events_count() == 1
@patch("src.webhooks.plane._create_gitea_branch", new_callable=AsyncMock)
@patch("src.webhooks.plane._create_initial_docs", new_callable=AsyncMock)
def test_plane_unknown_project_first_delivery_still_ignored(mock_docs, mock_branch):
"""ORCH-6 intact: first delivery of an unknown project -> {"status":"ignored"}."""
body = {
"event": "work_item.created",
"data": {"id": "unk-001", "name": "Unknown project task", "project": "proj-UNKNOWN"},
}
r1 = client.post("/webhook/plane", json=body)
assert r1.status_code == 200
assert r1.json()["status"] == "ignored"
# Event WAS logged (dedup happens before the project filter), so a retry of the
# SAME body is a duplicate, not re-evaluated.
assert _events_count() == 1
r2 = client.post("/webhook/plane", json=body)
assert r2.json()["status"] == "duplicate"
assert _events_count() == 1
# ---------------------------------------------------------------------------
# HMAC still guarded (acceptance #4) — independent of the dedup path
# ---------------------------------------------------------------------------
def test_gitea_invalid_signature_still_401(monkeypatch):
monkeypatch.setattr(gitea_mod.settings, "gitea_webhook_secret", "s3cr3t", raising=False)
r = client.post(
"/webhook/gitea",
json={"ref": "refs/heads/feature/x", "repository": {"name": "enduro-trails"}, "commits": []},
headers={"X-Gitea-Event": "push", "X-Gitea-Signature": "deadbeef"},
)
assert r.status_code == 401
def test_plane_invalid_signature_still_401(monkeypatch):
monkeypatch.setattr(plane_mod.settings, "plane_webhook_secret", "s3cr3t", raising=False)
r = client.post(
"/webhook/plane",
json={"event": "work_item.created", "data": {"id": "z", "project": "proj-1"}},
headers={"X-Plane-Signature": "deadbeef"},
)
assert r.status_code == 401