Compare commits

...

24 Commits

Author SHA1 Message Date
Dev Agent
7fd6529a35 test(conftest): mute Telegram in all tests to stop prod leakage
A pytest run on prod was sending REAL Telegram messages to Slava: some tests
(e.g. test_webhook_dedup advancing a stage) reach notify_stage_change ->
send_telegram, which read the live .env token/chat_id and actually POSTed.

Add an autouse fixture stubbing send_telegram to a no-op for every test. Patch
the SOURCE src.notifications.send_telegram (covers all notify_* helpers and the
many modules that do a local from .notifications import send_telegram inside
functions) AND src.stage_engine.send_telegram (module-level binding, would not be
intercepted by the source patch alone). webhooks/plane, launcher, queue_worker are
patched defensively with raising=False.

Verified: full suite run with FAKE telegram creds + an un-swallowable httpx.post
trip-wire (BaseException, so send_telegram except Exception can not hide it) shows
ZERO calls to api.telegram.org. Without the fixture the trip-wire fires, proving
the guard is real.
2026-06-03 18:23:09 +03:00
Dev Agent
9a702a0216 feat(metrics): per-agent token/cost accounting
Feature 4. claude is now launched with --output-format json; the run-log trailing
result JSON is parsed (defensively, never fatal) for usage + total_cost_usd. New
idempotent ALTERs add input_tokens/output_tokens/cache_read_tokens/cost_usd to
agent_runs; the launcher monitor records usage per run, posts a per-agent finish
comment under that agent bot (e.g. Developer gotov · 45.2k in / 12.1k out · $0.21),
and the deployer posts an end-of-task summary (SUM over agent_runs GROUP BY agent)
on done. New src/usage.py holds parse/format/record/summary helpers; test_usage.py
covers parsing a real CLI JSON blob, NULL-on-garbage, recording, formatting, and the
per-task aggregate.
2026-06-03 18:18:46 +03:00
Dev Agent
38a741d24e feat(webhook): verdict via Approved/Rejected statuses (variant B)
Feature 2. The issue updated dispatch (shipped with the status-trigger handler)
also routes Approved -> _try_advance_stage (== :approved: comment) and Rejected ->
_rollback_stage (== :rejected: comment). The :rejected: comment branch was
refactored into the shared _rollback_stage so both mechanisms behave identically;
a status reject passes Reason: (rejected via status, see latest comment) since no
inline reason arrives with a status change. Comments stay fully working. This
commit adds test_verdict_status.py proving both status and comment paths funnel
into the same advance/rollback logic.
2026-06-03 18:18:36 +03:00
Dev Agent
09b1c5e1b9 feat(webhook): start pipeline on In Progress status (not on create)
Feature 1. work_item.created no longer starts the pipeline (soft QG-0 log only);
the issue stays in the backlog until moved to In Progress. The pipeline-start body
is extracted into start_pipeline(); a new issue updated handler routes a state
change to In Progress -> handle_status_start, which is idempotent: an existing task
for the plane_id is NOT re-created or restarted (protects handle_comment, which also
flips issues to In Progress). Real Plane payload: event=issue, action=updated,
data.state.id. Existing m6/plane_webhook/dedup tests updated to drive the new
trigger; new test_status_trigger.py covers created-no-op / start / idempotent.
2026-06-03 18:18:26 +03:00
Dev Agent
a4668c0303 feat(plane): stage visibility on board + verdict status UUIDs
Feature 3 + Feature 2 infra. Extend the global PLANE_STATES with the 6 new
enduro status UUIDs (architecture/development/review/testing + approved/rejected),
remap STAGE_TO_STATE so the 4 mid-pipeline stages move the issue across its own
board column instead of all sitting in In Progress, and add the
set_issue_stage_state() helper. Needs Input / In Review / Blocked keep their own
explicit setters and stay higher priority. TODO(ORCH-10): statuses are per-project;
resolve per project when more projects are onboarded.
2026-06-03 18:18:17 +03:00
e9fd30528f Merge pull request 'feat(plane): per-agent bot authorship for comments' (#9) from feature/plane-per-agent-author into main 2026-06-03 10:55:29 +03:00
Dev Agent
d305521067 feat(plane): per-agent bot authorship for comments
add_comment now accepts an optional author (agent role) and POSTs under the matching Plane bot token via _headers_for(), so Plane shows the real author (Analyst/Architect/Developer/Reviewer/Tester/Deployer/Stream) instead of a single shared account. Unknown/empty roles or missing tokens fall back to the shared orchestrator token (autonomy preserved). GET/PATCH (find_issue_id, set_state) are unchanged and stay on the shared token. Call sites in stage_engine, launcher, webhooks/plane and the plane_sync notify helpers now pass author by stage role; stage transitions use stream. Adds tests/test_plane_author.py.
2026-06-03 10:53:25 +03:00
Dev Agent
30d6dd0557 feat(config): add per-agent Plane bot token settings
Add 7 optional bot-token fields (plane_bot_analyst..stream) read from the ORCH_PLANE_BOT_* env vars, default empty. Required for per-agent comment authorship; empty values fall back to the shared orchestrator token.
2026-06-03 10:53:17 +03:00
12e2691a24 Merge pull request 'M-6: derive work_item_id from Plane sequence_id' (#8) from feature/ORCH-M6-plane-sequence into main 2026-06-03 10:04:32 +03:00
Dev Agent
c431a3d055 fix(plane_sync): drop hardcoded ET- prefix in find_issue_id (M-6) 2026-06-03 10:02:15 +03:00
Dev Agent
1d978caea7 feat(webhook): derive work_item_id from Plane sequence_id (M-6) 2026-06-03 10:02:15 +03:00
be27f506e3 Merge pull request 'ORCH cleanup L-1/L-2/L-3: stages comment, prune run logs, emoji constants' (#7) from feature/ORCH-cleanup-L1L2L3 into main 2026-06-03 09:55:53 +03:00
Dev Agent
8f11971bfc refactor(plane_sync): extract emoji literals to constants (L-3) 2026-06-03 09:54:43 +03:00
Dev Agent
0653c2437f feat(launcher): prune old run logs (L-2) 2026-06-03 09:53:55 +03:00
Dev Agent
48b7707eb3 docs(stages): fix misleading STAGE_TRANSITIONS comment (L-1) 2026-06-03 09:51:46 +03:00
2fdc6856ba Merge pull request 'ORCH-5: webhook delivery dedup (M-7)' (#6) from feature/ORCH-5-webhook-dedup into main 2026-06-03 09:20:38 +03:00
Dev Agent
4ac449ff63 test(webhook): cover delivery dedup + migration safety (M-7) 2026-06-03 09:18:02 +03:00
Dev Agent
e6a7c6de8d feat(webhook): dedup deliveries by delivery_id (M-7) 2026-06-03 09:18:02 +03:00
Dev Agent
0b924208dc feat(db): add events.delivery_id + partial unique index (M-7) 2026-06-03 09:18:02 +03:00
2f0fd24670 Merge pull request 'ORCH-4: unified stage-engine (M-3)' (#5) from feature/ORCH-4-stage-engine into main 2026-06-03 08:59:51 +03:00
Dev Agent
6abdc220d2 test(stage): cover unified stage_engine + launcher/plane delegation
18 tests: happy-path advance per stage with correct agent (ORCH-4 fix),
QG-fail no-advance, reviewer REQUEST_CHANGES rollback+retry/alert, tester FAIL
rollback+retry/block, architect conflict rollback to analysis, analyst
approved-flow no-advance, and launcher+plane both delegating to the engine.
2026-06-03 08:56:25 +03:00
Dev Agent
51401a3ba9 refactor(launcher,plane): delegate stage advance to stage_engine
launcher._try_advance_stage and plane._try_advance_stage are now thin
wrappers over stage_engine.advance_stage. The plane webhook calls the sync
engine via asyncio.to_thread so there is exactly one implementation. The
launcher forwards finished_agent so the agent-specific rollback branches still
fire; the webhook passes None (human :approved:), matching prior behavior.

Also fixes the agent-selection bug in the launcher path: it used to enqueue
get_agent_for_stage(next_stage) (skipping a stage, e.g. analysis->architecture
launched developer instead of architect). The unified engine uses
get_agent_for_stage(current_stage), consistent with plane and gitea.
2026-06-03 08:56:25 +03:00
Dev Agent
0befc49b1e refactor(stage): extract unified stage_engine.advance_stage (M-3)
Merge the two diverged _try_advance_stage implementations (launcher sync +
plane async) into one synchronous engine. Preserves all launcher business
logic (analyst approved-flow, reviewer REQUEST_CHANGES rollback+retry, tester
FAIL rollback+retry, architect conflict rollback) and the plane
check_review_approved PR-by-branch dispatch. Unifies the QG signature
dispatch. Fixes agent selection: advancing FROM current_stage launches
get_agent_for_stage(current_stage), not next_stage.
2026-06-03 08:56:14 +03:00
fd554c8a5a Merge pull request 'ORCH-7: cleanup + hardening (M-4 dead code + M-2 graceful timeout)' (#4) from feature/ORCH-7-hardening into main 2026-06-03 08:31:26 +03:00
22 changed files with 3068 additions and 341 deletions

View File

@@ -16,6 +16,62 @@ from ..plane_sync import notify_stage_change as plane_notify_stage, add_comment
logger = logging.getLogger("orchestrator.launcher")
def prune_run_logs(runs_dir, keep_days=30, keep_max=500, active_paths=None):
"""L-2: best-effort rotation of per-run logs (<runs_dir>/*.log).
A log file is removed if it is older than keep_days OR it is not within the
keep_max most-recent logs (whichever condition is met first). Only *.log
files directly inside runs_dir are considered; non-.log files and
subdirectories are never touched. Files whose path is in active_paths (the
currently running log) are always kept.
Returns the number of files removed. Never raises: any error is logged and
swallowed so log rotation can never bring the app down.
"""
removed = 0
try:
active = set()
for ap in (active_paths or []):
try:
active.add(os.path.realpath(ap))
except Exception:
active.add(ap)
if not os.path.isdir(runs_dir):
return 0
logs = []
for name in os.listdir(runs_dir):
if not name.endswith(".log"):
continue
path = os.path.join(runs_dir, name)
if not os.path.isfile(path):
continue
if os.path.realpath(path) in active:
continue
try:
mtime = os.path.getmtime(path)
except OSError:
continue
logs.append((path, mtime))
logs.sort(key=lambda t: t[1], reverse=True)
cutoff = time.time() - keep_days * 86400
for idx, (path, mtime) in enumerate(logs):
too_old = mtime < cutoff
over_max = idx >= keep_max
if too_old or over_max:
try:
os.remove(path)
removed += 1
except OSError as e:
logger.warning(f"prune_run_logs: failed to remove {path}: {e}")
except Exception as e:
logger.warning(f"prune_run_logs failed for {runs_dir}: {e}")
return removed
class AgentLauncher:
"""Launch Claude CLI agents directly (binary mounted into container)."""
@@ -153,9 +209,15 @@ class AgentLauncher:
# No git fetch/checkout here: ensure_worktree() already put the worktree on
# the right branch. The agent simply runs inside its isolated work_path.
# Feature 4 (token usage): --output-format json makes claude emit a single
# result JSON (with usage + total_cost_usd) at the end of stdout. The log
# still captures it; _monitor_agent parses the trailing JSON after the run
# to record per-agent tokens/cost. _monitor_agent's failure handling keys
# off the process exit_code (not stdout shape), so this is safe.
cmd = (
f'cd {work_path} && '
f'{self.CLAUDE_BIN} --print '
f'--output-format json '
f'{model_flag}'
f'"$(cat {task_file})" '
f'--system-prompt "$(cat {system_prompt})" '
@@ -344,6 +406,17 @@ class AgentLauncher:
notify_agent_finished(run_id, agent, exit_code, task_id=_task_id, duration_s=_duration_s)
# Feature 4: parse token usage / cost from the (json) run log and record
# it on the agent_runs row. Never fatal — a garbled/missing JSON records
# NULLs and logs a warning so a broken run can't crash the monitor.
try:
from ..usage import parse_usage_from_log, record_usage
_usage = parse_usage_from_log(output_path) if output_path else None
record_usage(run_id, _usage)
except Exception as e:
logger.warning(f"run_id={run_id}: usage accounting failed: {e}")
_usage = None
# Commit and push any changes — in the per-branch worktree (ORCH-2 / S-4),
# NOT in the shared /repos/<repo>. The worktree is already on `branch`
# (ensure_worktree did the checkout), so no checkout is needed here.
@@ -415,7 +488,8 @@ class AgentLauncher:
set_issue_blocked(_wid)
plane_add_comment(
_wid,
"\u274c Deploy FAILED (smoke/healthcheck). Rolled back. Developer \u043d\u0443\u0436\u0435\u043d \u0434\u043b\u044f \u0444\u0438\u043a\u0441\u0430."
"\u274c Deploy FAILED (smoke/healthcheck). Rolled back. Developer \u043d\u0443\u0436\u0435\u043d \u0434\u043b\u044f \u0444\u0438\u043a\u0441\u0430.",
author="deployer",
)
from ..notifications import send_telegram
send_telegram(f"\U0001f6a8 {_wid}: Deploy failed! Rolled back. Needs fix.")
@@ -433,6 +507,14 @@ class AgentLauncher:
from ..notifications import send_telegram
send_telegram(f"\u26a0\ufe0f {_wid}: Agent {agent} failed (exit_code={exit_code}). Check logs: /app/data/runs/{run_id}.log")
# Feature 4: post the per-agent usage comment under that agent's bot, and
# — for the deployer finishing the task — the per-task usage summary.
if exit_code == 0:
try:
self._post_usage_comments(run_id, agent, repo, branch, _usage)
except Exception as e:
logger.warning(f"run_id={run_id}: usage comment failed: {e}")
# Auto-advance stage if agent finished successfully and QG passes
if exit_code == 0:
self._try_advance_stage(run_id, agent, repo, branch)
@@ -564,7 +646,15 @@ class AgentLauncher:
pass
def _try_advance_stage(self, run_id: int, agent: str, repo: str, branch: str):
"""After agent finishes successfully, check QG and advance stage if possible."""
"""After agent finishes successfully, advance the stage via the unified engine.
ORCH-4 / M-3: the 174-line body that used to live here moved into
src/stage_engine.advance_stage(). This is now a thin wrapper: it looks up
the task by (repo, branch) and delegates. `agent` is forwarded as
finished_agent so the analyst/reviewer/tester/architect rollback branches
still trigger exactly as before. The agent-selection bug (it used to call
get_agent_for_stage(next_stage)) is fixed inside the engine.
"""
try:
conn = get_db()
task_row = conn.execute(
@@ -576,178 +666,45 @@ class AgentLauncher:
return
task_id, current_stage, work_item_id = task_row
qg_name = get_qg_for_stage(current_stage)
next_stage = get_next_stage(current_stage)
if not next_stage:
return
# Run QG check if defined
if qg_name and qg_name in QG_CHECKS:
check_fn = QG_CHECKS[qg_name]
if qg_name in ("check_analysis_approved",):
# Requires human approval - post request comment if analyst just finished
if agent == "analyst" and qg_name == "check_analysis_approved" and work_item_id:
files_check = QG_CHECKS.get("check_analysis_complete")
if files_check:
files_ok, _ = files_check(repo, work_item_id, branch)
if files_ok:
# Full artifacts ready -> In Review
from ..plane_sync import set_issue_in_review
set_issue_in_review(work_item_id)
plane_add_comment(
work_item_id,
"\U0001f4cb BRD/\u0422\u0417/AC/TestPlan \u0433\u043e\u0442\u043e\u0432\u044b. "
"\u041f\u0440\u043e\u0448\u0443 review \u0438 \u0440\u0435\u0430\u043a\u0446\u0438\u044e :approved: \u0434\u043b\u044f \u043f\u0440\u043e\u0434\u0432\u0438\u0436\u0435\u043d\u0438\u044f \u0432 Architecture."
)
notify_approve_requested(task_id)
logger.info(f"Task {task_id}: analyst finished, requested :approved: in Plane")
else:
# Check if questions file exists (in the task worktree)
import os as _os
questions_path = _os.path.join(
get_worktree_path(repo, branch),
f"docs/work-items/{work_item_id}/01-questions.md"
)
if _os.path.isfile(questions_path):
# Analyst has questions -> Needs Input
from ..plane_sync import set_issue_needs_input
set_issue_needs_input(work_item_id)
with open(questions_path, "r") as qf:
questions_text = qf.read()
plane_add_comment(
work_item_id,
f"\u2753 Analyst \u043d\u0443\u0436\u0434\u0430\u0435\u0442\u0441\u044f \u0432 \u0443\u0442\u043e\u0447\u043d\u0435\u043d\u0438\u0438:\n\n{questions_text}"
)
from ..notifications import send_telegram
send_telegram(
f"\u2753 {work_item_id}: Analyst \u0437\u0430\u0434\u0430\u0451\u0442 \u0432\u043e\u043f\u0440\u043e\u0441\u044b. \u041e\u0442\u0432\u0435\u0442\u044c \u0432 Plane."
)
else:
# No artifacts and no questions
plane_add_comment(
work_item_id,
"\u26a0\ufe0f Analyst \u0437\u0430\u0432\u0435\u0440\u0448\u0438\u043b\u0441\u044f \u0431\u0435\u0437 \u0430\u0440\u0442\u0435\u0444\u0430\u043a\u0442\u043e\u0432 \u0438 \u0431\u0435\u0437 \u0432\u043e\u043f\u0440\u043e\u0441\u043e\u0432. \u041f\u0440\u043e\u0432\u0435\u0440\u044c\u0442\u0435 \u043b\u043e\u0433."
)
return
elif qg_name in ("check_ci_green", "check_tests_local"):
# (repo, branch) signature — already worktree-aware.
passed, reason = check_fn(repo, branch)
elif qg_name == "check_tests_passed":
# Artifact check — pass branch so it reads from the worktree.
passed, reason = check_fn(repo, work_item_id or "", branch)
else:
# Other artifact checks (check_architecture_done, etc.) — worktree-aware.
passed, reason = check_fn(repo, work_item_id or "", branch)
if not passed:
logger.info(f"Task {task_id}: QG '{qg_name}' not passed after {agent}: {reason}")
# If reviewer says REQUEST_CHANGES, rollback to development
if agent == "reviewer" and "REQUEST_CHANGES" in reason:
update_task_stage(task_id, "development")
notify_stage_change(task_id, current_stage, "development")
plane_notify_stage(work_item_id, current_stage, "development")
# Count retries
conn2 = get_db()
retry_count = conn2.execute(
"SELECT COUNT(*) FROM agent_runs WHERE task_id=? AND agent='developer'",
(task_id,)
).fetchone()[0]
conn2.close()
if retry_count < 3:
task_desc = (
f"Work item: {work_item_id}\nRepo: {repo}\nBranch: {branch}\n"
f"Stage: development\nNote: REQUEST_CHANGES from reviewer "
f"(attempt {retry_count+1}/3). Fix findings in "
f"docs/work-items/{work_item_id}/12-review.md"
)
new_job = enqueue_job("developer", repo, task_desc, task_id=task_id)
logger.info(f"Task {task_id}: reviewer REQUEST_CHANGES, enqueued developer (job_id={new_job})")
else:
from ..notifications import send_telegram
send_telegram(f"\u26a0\ufe0f {work_item_id}: Max developer retries (3) reached. Manual intervention needed.")
logger.error(f"Task {task_id}: max retries reached")
# Task 6: Tester FAIL -> rollback to development
if agent == "tester" and qg_name == "check_tests_passed" and not passed:
update_task_stage(task_id, "development")
notify_stage_change(task_id, current_stage, "development")
plane_notify_stage(work_item_id, current_stage, "development")
from ..plane_sync import set_issue_in_progress
set_issue_in_progress(work_item_id)
plane_add_comment(
work_item_id,
f"\u274c \u0422\u0435\u0441\u0442\u044b \u043d\u0435 \u043f\u0440\u043e\u0448\u043b\u0438: {reason}. Developer \u043f\u0435\u0440\u0435\u0437\u0430\u043f\u0443\u0449\u0435\u043d \u0434\u043b\u044f \u0444\u0438\u043a\u0441\u0430."
)
conn2 = get_db()
retry_count = conn2.execute(
"SELECT COUNT(*) FROM agent_runs WHERE task_id=? AND agent='developer'",
(task_id,)
).fetchone()[0]
conn2.close()
if retry_count < 3:
task_desc = (
f"Work item: {work_item_id}\nRepo: {repo}\nBranch: {branch}\n"
f"Stage: development\nNote: Tests FAILED. "
f"Fix failures described in docs/work-items/{work_item_id}/13-test-report.md"
)
new_job = enqueue_job("developer", repo, task_desc, task_id=task_id)
logger.info(f"Task {task_id}: tester FAIL, enqueued developer (job_id={new_job})")
else:
from ..notifications import send_telegram
from ..plane_sync import set_issue_blocked
set_issue_blocked(work_item_id)
send_telegram(f"\U0001f6a8 {work_item_id}: Tests still failing after 3 developer retries. Manual intervention needed.")
# Task 8: Architect conflict -> rollback to analysis
if agent == "architect" and qg_name == "check_architecture_done" and not passed:
import os as _os
conflict_path = _os.path.join(
get_worktree_path(repo, branch),
f"docs/work-items/{work_item_id}/10-conflict.md"
)
if _os.path.isfile(conflict_path):
update_task_stage(task_id, "analysis")
notify_stage_change(task_id, current_stage, "analysis")
plane_notify_stage(work_item_id, current_stage, "analysis")
from ..plane_sync import set_issue_in_progress
set_issue_in_progress(work_item_id)
with open(conflict_path, "r") as cf:
conflict_text = cf.read()[:500]
plane_add_comment(
work_item_id,
f"\u26a0\ufe0f Architect \u043d\u0430\u0448\u0451\u043b \u043a\u043e\u043d\u0444\u043b\u0438\u043a\u0442 \u0441 \u0422\u0417. \u0412\u043e\u0437\u0432\u0440\u0430\u0442 \u0432 Analysis.\n\n{conflict_text}"
)
task_desc = (
f"Work item: {work_item_id}\nRepo: {repo}\nBranch: {branch}\n"
f"Stage: analysis\nNote: Architect conflict. Revise TRZ. "
f"See docs/work-items/{work_item_id}/10-conflict.md"
)
new_job = enqueue_job("analyst", repo, task_desc, task_id=task_id)
logger.info(f"Task {task_id}: architect conflict, enqueued analyst (job_id={new_job})")
return
return
elif qg_name:
return
# Advance stage
update_task_stage(task_id, next_stage)
notify_stage_change(task_id, current_stage, next_stage)
plane_notify_stage(work_item_id, current_stage, next_stage)
logger.info(f"Task {task_id}: {current_stage} -> {next_stage} (auto-advance after {agent})")
# Launch next agent if defined
next_agent = get_agent_for_stage(next_stage)
if next_agent:
task_desc = f"Work item: {work_item_id}\nRepo: {repo}\nBranch: {branch}\nStage: {next_stage}"
new_job_id = enqueue_job(next_agent, repo, task_desc, task_id=task_id)
logger.info(f"Task {task_id}: enqueued '{next_agent}' (job_id={new_job_id})")
from ..stage_engine import advance_stage
advance_stage(
task_id=task_id,
current_stage=current_stage,
repo=repo,
work_item_id=work_item_id,
branch=branch,
finished_agent=agent,
)
except Exception as e:
logger.error(f"Auto-advance failed for run_id={run_id}: {e}")
def _post_usage_comments(self, run_id, agent, repo, branch, usage):
"""Feature 4: post the per-agent usage comment (and Deployer summary).
- Always (on success, with a work_item_id): a per-agent finish comment
with token/cost, authored by the finishing agent's Plane bot.
- When the deployer finishes: also a per-task summary (SUM over
agent_runs GROUP BY agent), authored by the deployer.
"""
from ..usage import usage_comment, task_summary_comment
conn = get_db()
row = conn.execute(
"SELECT id, work_item_id FROM tasks WHERE repo=? AND branch=?",
(repo, branch),
).fetchone()
conn.close()
if not row:
return
task_id, work_item_id = row[0], row[1]
if not work_item_id:
return
plane_add_comment(work_item_id, usage_comment(agent, usage), author=agent)
if agent == "deployer":
plane_add_comment(
work_item_id, task_summary_comment(task_id), author="deployer"
)
def _ensure_pr(self, repo: str, branch: str, run_id: int):
import httpx
owner = settings.gitea_owner

View File

@@ -9,6 +9,17 @@ class Settings(BaseSettings):
plane_webhook_secret: str = ""
plane_project_id: str = ""
# Per-agent Plane bot tokens (feat: per-agent comment authorship).
# When set, add_comment posts under the matching bot so Plane shows the
# real author (Analyst/Architect/...). Empty -> fallback to plane_api_token.
plane_bot_analyst: str = ""
plane_bot_architect: str = ""
plane_bot_developer: str = ""
plane_bot_reviewer: str = ""
plane_bot_tester: str = ""
plane_bot_deployer: str = ""
plane_bot_stream: str = ""
# Gitea
gitea_url: str = "http://localhost:3000"
gitea_token: str = ""
@@ -66,6 +77,15 @@ class Settings(BaseSettings):
agent_kill_grace_seconds: int = 20
agent_timeout_overrides_json: str = ""
# L-2: run-log rotation. Old per-run logs in <data>/runs/*.log are pruned at
# app startup (best-effort). A *.log is removed if it is older than
# log_keep_days OR not within the log_keep_max most-recent logs (whichever
# hits first). Only *.log files are touched; the active run log is skipped.
# log_keep_days -> max age in days (env ORCH_LOG_KEEP_DAYS).
# log_keep_max -> max number of newest logs to retain (env ORCH_LOG_KEEP_MAX).
log_keep_days: int = 30
log_keep_max: int = 500
# Telegram notifications
telegram_bot_token: str = ""

View File

@@ -67,6 +67,24 @@ def init_db():
# (CREATE TABLE IF NOT EXISTS won't add columns to an already-created table).
_ensure_column(conn, "jobs", "transient_attempts", "INTEGER NOT NULL DEFAULT 0")
_ensure_column(conn, "jobs", "available_at", "TEXT")
# ORCH-5 (M-7): webhook delivery de-dup. Add events.delivery_id and a PARTIAL
# unique index. Partial (WHERE delivery_id IS NOT NULL) so pre-existing rows
# (which have NULL delivery_id) never collide with each other. Restart-safe:
# _ensure_column is a no-op once the column exists, and CREATE INDEX IF NOT
# EXISTS is a no-op once the index exists, so this is safe on the live prod DB.
_ensure_column(conn, "events", "delivery_id", "TEXT")
conn.execute(
"CREATE UNIQUE INDEX IF NOT EXISTS idx_events_delivery "
"ON events(delivery_id) WHERE delivery_id IS NOT NULL"
)
# Feature 4 (token usage): per-run token / cost accounting. Parsed from the
# claude --output-format json result by the launcher monitor. Idempotent
# ALTERs (no-op once the columns exist) so this is safe on the live prod DB.
_ensure_column(conn, "agent_runs", "input_tokens", "INTEGER")
_ensure_column(conn, "agent_runs", "output_tokens", "INTEGER")
_ensure_column(conn, "agent_runs", "cache_read_tokens", "INTEGER")
_ensure_column(conn, "agent_runs", "cost_usd", "REAL")
conn.commit()
conn.close()
@@ -141,6 +159,33 @@ def get_next_work_item_id(repo: str, prefix: str = "ET") -> str:
return f"{prefix}-{next_num:03d}"
# ---------------------------------------------------------------------------
# ORCH-5 (M-7): idempotent webhook event logging
# ---------------------------------------------------------------------------
def insert_event_dedup(
source: str, event_type: str, payload: str, delivery_id: str
) -> bool:
"""Idempotently log a webhook event keyed by delivery_id.
Returns True if a NEW row was inserted (caller should dispatch the event) and
False if this delivery_id was already present (a duplicate delivery -> caller
must skip dispatch/enqueue). Uses INSERT OR IGNORE against the partial UNIQUE
index idx_events_delivery; rowcount==1 means the row was actually inserted.
"""
conn = get_db()
try:
cur = conn.execute(
"INSERT OR IGNORE INTO events (source, event_type, payload, delivery_id) "
"VALUES (?, ?, ?, ?)",
(source, event_type, payload, delivery_id),
)
conn.commit()
return cur.rowcount == 1
finally:
conn.close()
# ---------------------------------------------------------------------------
# ORCH-1 (F-2b): job queue helpers
# ---------------------------------------------------------------------------

View File

@@ -60,6 +60,22 @@ async def lifespan(app: FastAPI):
if requeued:
log.warning(f"Queue-recovery: requeued {requeued} running job(s) after restart")
# L-2: rotate old per-run logs at startup (best-effort; never fatal).
try:
import os as _os
from .config import settings as _settings
from .agents.launcher import prune_run_logs
_runs_dir = _os.path.join(_os.path.dirname(_settings.db_path), "runs")
_removed = prune_run_logs(
_runs_dir,
keep_days=_settings.log_keep_days,
keep_max=_settings.log_keep_max,
)
if _removed:
log.info(f"Log rotation: pruned {_removed} old run log(s) from {_runs_dir}")
except Exception as e:
log.warning(f"Log rotation skipped: {e}")
# Start the background job-queue worker (ORCH-1).
from .queue_worker import worker
worker.start()

View File

@@ -6,9 +6,53 @@ from .config import settings
logger = logging.getLogger("orchestrator.plane_sync")
# L-3: emoji literals used in Plane comment bodies, named for readability.
# Message text stays byte-for-byte identical to the previous output.
EMOJI_STAGE = "\U0001F504" # stage transition
EMOJI_QG_FAIL = "\u26A0\uFE0F" # quality-gate failure
EMOJI_DONE = "\u2705" # task completed
PLANE_BASE = f"{settings.plane_api_url}/api/v1"
PLANE_HEADERS = {"X-API-Key": settings.plane_api_token}
WORKSPACE = settings.plane_workspace_slug
# feat(plane): per-agent comment authorship.
# Map an agent role -> its dedicated Plane bot token (read from config / env).
# When the token is present, add_comment() POSTs under that bot so Plane shows
# the real author. Empty/unknown role -> fallback to the shared orchestrator
# token (PLANE_HEADERS), so commenting stays autonomous.
PLANE_BOT_TOKENS = {
"analyst": settings.plane_bot_analyst,
"architect": settings.plane_bot_architect,
"developer": settings.plane_bot_developer,
"reviewer": settings.plane_bot_reviewer,
"tester": settings.plane_bot_tester,
"deployer": settings.plane_bot_deployer,
"stream": settings.plane_bot_stream,
}
# Map a pipeline stage -> the agent role that owns work in that stage. Used to
# pick an author for rollback/stage notifications targeting a specific stage.
STAGE_AUTHORS = {
"analysis": "analyst",
"architecture": "architect",
"development": "developer",
"review": "reviewer",
"testing": "tester",
"deploy": "deployer",
}
def _headers_for(author: str | None) -> dict:
"""Return X-API-Key headers for the given agent role.
Falls back to the shared orchestrator token (PLANE_HEADERS /
settings.plane_api_token) when the role is None, unknown, or its bot token
is not configured. This keeps comment posting autonomous: a comment is
always written, just attributed to the orchestrator if no bot is set.
"""
tok = PLANE_BOT_TOKENS.get(author or "") if author else None
return {"X-API-Key": tok} if tok else PLANE_HEADERS
PROJECT_ID = settings.plane_project_id or "7a79f0a9-5278-49cd-9007-9a338f238f9c"
@@ -40,7 +84,12 @@ def _resolve_project_id(work_item_id: str = None, project_id: str = None) -> str
logger.debug(f"_resolve_project_id fallback for {work_item_id}: {e}")
return PROJECT_ID
# Plane state IDs
# Plane state IDs.
# TODO(ORCH-10): these UUIDs are PER-PROJECT. The 6 stage-visibility / verdict
# statuses below were created only in the enduro project (7a79f0a9-...). One
# project is in prod today, so a single global dict is acceptable. When more
# projects are onboarded these must be resolved per project (see ORCH-10 in
# BACKLOG.md / the ORCH-6 project registry) — do NOT hardcode globally then.
PLANE_STATES = {
"backlog": "113b24f6-cce8-4be9-9a22-a359b9cf0122",
"todo": "2c7d3df3-9eb9-419b-92b7-d7d560bcdd10",
@@ -50,21 +99,62 @@ PLANE_STATES = {
"blocked": "6c4543f9-ac47-4ef7-ae0f-070020dc9920",
"done": "381a2833-3c4e-4be5-bd0f-be84cb946ad8",
"cancelled": "b1cae7f9-961d-4889-a179-f3acea697d17",
# Feature 3 (stage visibility) — per-stage statuses on the board.
"architecture": "3020bbb7-6122-4663-930c-0315ba8dfa3d",
"development": "9920609b-f140-4e46-ab95-89acda8412c8",
"review": "ba0d802c-5218-41d4-ab43-978b0ea123ed",
"testing": "7855d807-b1bf-42ef-8dae-6cde0df92d02",
# Feature 2 (verdict statuses) — Approved / Rejected.
"approved": "a519a341-dada-4a91-8910-7604f82b79c5",
"rejected": "ba958f3c-5db5-461d-8f82-89425e413b97",
}
# Map orchestrator stages to Plane states
# Feature 3: map an orchestrator stage -> the Plane status to show on the board
# when the pipeline ENTERS that stage. analysis stays driven by the existing
# in_progress/in_review/needs_input logic (no dedicated status). deploy keeps
# in_progress until done. Needs Input / In Review / Blocked remain higher
# priority and are set explicitly elsewhere — do NOT override them from here.
STAGE_VISIBILITY_STATE = {
"architecture": "architecture",
"development": "development",
"review": "review",
"testing": "testing",
}
# Map orchestrator stages to Plane states (used by update_issue_state /
# notify_stage_change). Feature 3: architecture/development/review/testing now
# point at their dedicated board statuses so the task physically moves across
# columns. analysis -> in_progress, deploy -> in_progress, done -> done.
STAGE_TO_STATE = {
"created": PLANE_STATES["todo"],
"analysis": PLANE_STATES["in_progress"],
"architecture": PLANE_STATES["in_progress"],
"development": PLANE_STATES["in_progress"],
"review": PLANE_STATES["in_progress"],
"testing": PLANE_STATES["in_progress"],
"architecture": PLANE_STATES["architecture"],
"development": PLANE_STATES["development"],
"review": PLANE_STATES["review"],
"testing": PLANE_STATES["testing"],
"deploy": PLANE_STATES["in_progress"],
"done": PLANE_STATES["done"],
}
def fetch_issue_sequence_id(issue_id: str, project_id: str) -> int | None:
"""M-6: GET the Plane issue by UUID and return its sequence_id (the
authoritative per-project number), or None if unavailable.
Returns None on network error, non-2xx, or a missing field - never raises,
so the webhook handler can fall back to DB increment and stay autonomous.
"""
url = f"{PLANE_BASE}/workspaces/{WORKSPACE}/projects/{project_id}/issues/{issue_id}/"
try:
resp = httpx.get(url, headers=PLANE_HEADERS, timeout=10)
resp.raise_for_status()
seq = resp.json().get("sequence_id")
return int(seq) if seq is not None else None
except Exception as e:
logger.warning(f"fetch_issue_sequence_id failed for {issue_id}: {e}")
return None
def find_issue_id(work_item_id: str, project_id: str = None) -> str | None:
"""Find Plane issue UUID by work_item_id (e.g. 'ET-002')."""
project_id = _resolve_project_id(work_item_id, project_id)
@@ -89,25 +179,26 @@ def find_issue_id(work_item_id: str, project_id: str = None) -> str | None:
resp.raise_for_status()
data = resp.json()
results = data.get("results", data if isinstance(data, list) else [])
# M-6: match by sequence_id directly (the authoritative per-project
# number), parsed from the work_item_id suffix - no hardcoded prefix.
try:
target_num = int(work_item_id.rsplit("-", 1)[1])
except (IndexError, ValueError):
target_num = None
for issue in results:
seq = issue.get("sequence_id")
identifier = f"ET-{seq:03d}" if seq else ""
if identifier == work_item_id or work_item_id in issue.get("name", ""):
if target_num is not None and issue.get("sequence_id") == target_num:
return issue["id"]
# Fallback: get all issues and match by sequence_id number
if work_item_id.startswith("ET-"):
try:
target_num = int(work_item_id.split("-")[1])
except (IndexError, ValueError):
target_num = None
if target_num:
resp2 = httpx.get(url, headers=PLANE_HEADERS, timeout=10)
resp2.raise_for_status()
data2 = resp2.json()
results2 = data2.get("results", data2 if isinstance(data2, list) else [])
for issue in results2:
if issue.get("sequence_id") == target_num:
return issue["id"]
if work_item_id in issue.get("name", ""):
return issue["id"]
# Fallback: get all issues and match by sequence_id number (any prefix)
if target_num is not None:
resp2 = httpx.get(url, headers=PLANE_HEADERS, timeout=10)
resp2.raise_for_status()
data2 = resp2.json()
results2 = data2.get("results", data2 if isinstance(data2, list) else [])
for issue in results2:
if issue.get("sequence_id") == target_num:
return issue["id"]
except Exception as e:
logger.error(f"Failed to find issue for {work_item_id}: {e}")
return None
@@ -134,8 +225,14 @@ def update_issue_state(work_item_id: str, stage: str, project_id: str = None):
logger.error(f"Failed to update Plane state for {work_item_id}: {e}")
def add_comment(work_item_id: str, text: str, project_id: str = None):
"""Add a comment to Plane issue."""
def add_comment(work_item_id: str, text: str, project_id: str = None, author: str = None):
"""Add a comment to a Plane issue.
feat(plane): when ``author`` (an agent role) maps to a configured bot
token, the comment is POSTed under that bot so Plane shows the real author.
Otherwise it falls back to the shared orchestrator token (see
``_headers_for``). GET/PATCH calls elsewhere keep using PLANE_HEADERS.
"""
project_id = _resolve_project_id(work_item_id, project_id)
issue_id = find_issue_id(work_item_id, project_id)
if not issue_id:
@@ -145,9 +242,9 @@ def add_comment(work_item_id: str, text: str, project_id: str = None):
url = f"{PLANE_BASE}/workspaces/{WORKSPACE}/projects/{project_id}/issues/{issue_id}/comments/"
html = f"<p>{text}</p>"
try:
resp = httpx.post(url, headers=PLANE_HEADERS, json={"comment_html": html}, timeout=10)
resp = httpx.post(url, headers=_headers_for(author), json={"comment_html": html}, timeout=10)
resp.raise_for_status()
logger.info(f"Plane: comment added to {work_item_id}")
logger.info(f"Plane: comment added to {work_item_id} (author={author or 'orchestrator'})")
except Exception as e:
logger.error(f"Failed to add comment to {work_item_id}: {e}")
@@ -173,6 +270,21 @@ def set_issue_in_progress(work_item_id: str, project_id: str = None):
_set_issue_state_direct(work_item_id, PLANE_STATES["in_progress"], project_id)
def set_issue_stage_state(work_item_id: str, stage: str, project_id: str = None):
"""Feature 3: move the issue to the board status for a pipeline stage.
Only the visible-stage statuses (architecture/development/review/testing)
are driven here — stages without a dedicated status (analysis/deploy) are a
no-op so the existing in_progress/in_review/needs_input logic stays in
charge. By design this does NOT touch Needs Input / In Review / Blocked,
which are higher priority and set explicitly by their own helpers.
"""
state_key = STAGE_VISIBILITY_STATE.get(stage)
if not state_key:
return
_set_issue_state_direct(work_item_id, PLANE_STATES[state_key], project_id)
def _set_issue_state_direct(work_item_id: str, state_id: str, project_id: str = None):
"""Set issue state directly by state_id."""
project_id = _resolve_project_id(work_item_id, project_id)
@@ -194,7 +306,7 @@ def notify_stage_change(work_item_id: str, old_stage: str, new_stage: str, agent
project_id = _resolve_project_id(work_item_id, project_id)
update_issue_state(work_item_id, new_stage, project_id)
msg = f"🔄 Stage: {old_stage}{new_stage}"
msg = f"{EMOJI_STAGE} Stage: {old_stage}{new_stage}"
if agent:
msg += f" (launching {agent})"
@@ -227,16 +339,29 @@ def notify_stage_change(work_item_id: str, old_stage: str, new_stage: str, agent
except Exception:
pass
add_comment(work_item_id, msg, project_id)
# Stage transition is the orchestrator's own voice -> attribute to stream.
add_comment(work_item_id, msg, project_id, author="stream")
def notify_qg_failure(work_item_id: str, stage: str, check: str, reason: str, project_id: str = None):
"""Notify Plane about QG failure."""
add_comment(work_item_id, f"⚠️ QG failed at {stage}: {check}{reason}", project_id)
# QG failure belongs to the agent that owns the failing stage.
add_comment(
work_item_id,
f"{EMOJI_QG_FAIL} QG failed at {stage}: {check}{reason}",
project_id,
author=STAGE_AUTHORS.get(stage, "stream"),
)
def notify_done(work_item_id: str, project_id: str = None):
"""Mark issue as Done in Plane."""
project_id = _resolve_project_id(work_item_id, project_id)
update_issue_state(work_item_id, "done", project_id)
add_comment(work_item_id, "✅ Task completed! PR merged and deployed.", project_id)
# Deploy finished the task -> attribute the completion comment to Deployer.
add_comment(
work_item_id,
f"{EMOJI_DONE} Task completed! PR merged and deployed.",
project_id,
author="deployer",
)

430
src/stage_engine.py Normal file
View File

@@ -0,0 +1,430 @@
"""Unified stage engine (ORCH-4 / M-3).
Single source of truth for "an agent finished / a human approved -> run the
stage's quality gate and either advance the pipeline or roll it back".
Before ORCH-4 this logic was duplicated in two places that had silently
diverged:
- src/agents/launcher.py::_try_advance_stage (sync, rich business logic:
analyst approved-flow, reviewer REQUEST_CHANGES rollback+retry, tester FAIL
rollback+retry, architect conflict rollback) — but it picked the next agent
with get_agent_for_stage(next_stage), which is WRONG.
- src/webhooks/plane.py::_try_advance_stage (async, leaner, but it had the
check_review_approved PR-by-branch dispatch and used the CORRECT
get_agent_for_stage(current_stage)).
This module merges both into one sync `advance_stage(...)`. launcher calls it
directly; the plane webhook calls it through asyncio.to_thread so there is
exactly one implementation.
Agent-selection bug fix (ORCH-4):
stages.py defines `agent` as "the agent to launch when advancing FROM this
stage". So when advancing current -> next, the correct agent to launch is
get_agent_for_stage(current_stage). launcher's old next_stage lookup skipped a
stage (e.g. analysis->architecture launched 'developer' instead of
'architect'). plane and gitea already used current_stage; we unify on that.
"""
import logging
import os
from dataclasses import dataclass, field
from .db import get_db, update_task_stage, enqueue_job
from .stages import get_next_stage, get_qg_for_stage, get_agent_for_stage
from .git_worktree import get_worktree_path
from .qg.checks import QG_CHECKS
from .notifications import (
notify_stage_change,
notify_qg_failure,
notify_approve_requested,
send_telegram,
)
from .plane_sync import (
notify_stage_change as plane_notify_stage,
notify_qg_failure as plane_notify_qg,
add_comment as plane_add_comment,
set_issue_in_review,
set_issue_needs_input,
set_issue_in_progress,
set_issue_blocked,
)
from .config import settings
logger = logging.getLogger("orchestrator.stage_engine")
MAX_DEVELOPER_RETRIES = 3
@dataclass
class AdvanceResult:
"""Outcome of an advance_stage() call (mostly for tests/observability)."""
advanced: bool = False
from_stage: str | None = None
to_stage: str | None = None
enqueued_agent: str | None = None
enqueued_job_id: int | None = None
qg_name: str | None = None
qg_passed: bool | None = None
qg_reason: str | None = None
rolled_back_to: str | None = None
alerted: bool = False
note: str | None = None
notes: list = field(default_factory=list)
def _run_qg(qg_name: str, repo: str, work_item_id: str, branch: str):
"""Dispatch a quality-gate check to the right signature and run it.
Signatures (unified from launcher + plane):
- check_ci_green / check_tests_local -> (repo, branch)
- check_review_approved -> (repo, pr_number) [PR found by branch]
- everything else (artifact checks) -> (repo, work_item_id, branch)
Returns (passed: bool, reason: str).
"""
check_fn = QG_CHECKS.get(qg_name)
if not check_fn:
logger.error(f"QG function '{qg_name}' not found in registry")
return False, f"Unknown QG: {qg_name}"
if qg_name in ("check_ci_green", "check_tests_local"):
# (repo, branch) — already worktree-aware.
return check_fn(repo, branch)
if qg_name == "check_review_approved":
# Special case kept from plane: find the open PR for this branch via
# Gitea, then check it; fall back to a file-based review marker.
return _check_review_approved_by_branch(check_fn, repo, work_item_id, branch)
# All other artifact checks: (repo, work_item_id, branch). Pass branch so the
# check reads from the task worktree (ORCH-2 / S-4).
return check_fn(repo, work_item_id or "", branch)
def _check_review_approved_by_branch(check_fn, repo: str, work_item_id: str, branch: str):
"""check_review_approved dispatch preserved from plane._try_advance_stage.
Finds the open PR whose head ref == branch via the Gitea API and runs
check_review_approved(repo, pr_number). If no open PR exists, falls back to a
file-based review marker (12-review.md / 09-review.md) like the original.
"""
import httpx as _httpx
owner = settings.gitea_owner
url = f"{settings.gitea_url}/api/v1/repos/{owner}/{repo}/pulls?state=open&limit=50"
headers = {"Authorization": f"token {settings.gitea_token}"}
try:
resp = _httpx.get(url, headers=headers, timeout=10)
prs = resp.json()
pr_number = None
for pr in prs:
if pr.get("head", {}).get("ref") == branch:
pr_number = pr["number"]
break
if pr_number:
return check_fn(repo, pr_number)
# No open PR but a review file may exist — check file-based.
wt = get_worktree_path(repo, branch)
if not os.path.isdir(wt):
wt = os.path.join(settings.repos_dir, repo)
review_path = os.path.join(wt, f"docs/work-items/{work_item_id}/12-review.md")
review_path2 = os.path.join(wt, f"docs/work-items/{work_item_id}/09-review.md")
if os.path.isfile(review_path) or os.path.isfile(review_path2):
return True, "Review file exists (file-based approval)"
return False, "No open PR found and no review file"
except Exception as e:
return False, f"Error finding PR: {e}"
def _developer_retry_count(task_id: int) -> int:
"""How many developer runs have already happened for this task."""
conn = get_db()
n = conn.execute(
"SELECT COUNT(*) FROM agent_runs WHERE task_id=? AND agent='developer'",
(task_id,),
).fetchone()[0]
conn.close()
return n
def advance_stage(
task_id: int,
current_stage: str,
repo: str,
work_item_id: str,
branch: str,
finished_agent: str | None = None,
) -> AdvanceResult:
"""Run the current stage's quality gate and advance / roll back the pipeline.
This is the single merged implementation (ORCH-4 / M-3). It is synchronous;
the async plane webhook calls it via asyncio.to_thread.
Args:
task_id: tasks.id
current_stage: the stage the task is currently in
repo: repository name
work_item_id: Plane work item id (may be "" / None)
branch: feature branch
finished_agent: the agent that just finished (launcher path). Drives the
approved/REQUEST_CHANGES/tester/architect branches. In the
plane webhook path it is None, so those agent-specific
branches simply do not trigger (matches old plane behavior).
Returns AdvanceResult describing what happened.
"""
result = AdvanceResult(from_stage=current_stage)
agent = finished_agent
try:
qg_name = get_qg_for_stage(current_stage)
next_stage = get_next_stage(current_stage)
result.qg_name = qg_name
result.to_stage = next_stage
if not next_stage:
logger.info(f"Task {task_id}: already at terminal stage '{current_stage}'")
result.note = "terminal"
return result
# --- Quality gate ----------------------------------------------------
if qg_name and qg_name in QG_CHECKS:
# Human-approval gate: special analyst approved-flow (launcher only).
if qg_name == "check_analysis_approved":
_handle_analysis_approved_flow(
task_id, current_stage, repo, work_item_id, branch, agent, result
)
return result
passed, reason = _run_qg(qg_name, repo, work_item_id, branch)
result.qg_passed = passed
result.qg_reason = reason
if not passed:
logger.info(
f"Task {task_id}: QG '{qg_name}' not passed after {agent}: {reason}"
)
# Behaviour parity:
# - webhook path (finished_agent is None): emit the generic
# QG-failure notification, exactly like the old plane handler.
# - launcher path (finished_agent set): NO generic notification;
# the rollback branches below own their own messaging, exactly
# like the old launcher handler.
if agent is None:
notify_qg_failure(task_id, current_stage, qg_name, reason)
plane_notify_qg(work_item_id, current_stage, qg_name, reason)
_handle_qg_failure_rollbacks(
task_id, current_stage, repo, work_item_id, branch,
agent, qg_name, reason, result,
)
return result
elif qg_name:
# QG name set but not registered — do not advance (launcher behavior).
result.note = f"qg '{qg_name}' not in registry"
return result
# --- Advance ---------------------------------------------------------
update_task_stage(task_id, next_stage)
notify_stage_change(task_id, current_stage, next_stage)
plane_notify_stage(work_item_id, current_stage, next_stage)
result.advanced = True
logger.info(
f"Task {task_id}: {current_stage} -> {next_stage} "
f"(auto-advance after {agent})"
)
# --- Launch the next agent (ORCH-4 fix: current_stage, not next) -----
next_agent = get_agent_for_stage(current_stage)
if next_agent:
task_desc = (
f"Work item: {work_item_id}\nRepo: {repo}\n"
f"Branch: {branch}\nStage: {next_stage}"
)
new_job_id = enqueue_job(next_agent, repo, task_desc, task_id=task_id)
result.enqueued_agent = next_agent
result.enqueued_job_id = new_job_id
logger.info(
f"Task {task_id}: enqueued '{next_agent}' (job_id={new_job_id})"
)
return result
except Exception as e:
logger.error(f"advance_stage failed for task_id={task_id}: {e}")
result.note = f"error: {e}"
return result
def _handle_analysis_approved_flow(
task_id, current_stage, repo, work_item_id, branch, agent, result: AdvanceResult
):
"""Analyst approved-flow (launcher only).
Only triggers when the analyst just finished (agent == 'analyst') in the
launcher path. Decides between: artifacts ready -> In Review + request
:approved:; questions file -> Needs Input; otherwise a warning comment.
This gate never advances on its own (human approval does that via the plane
webhook), matching the original launcher behavior.
"""
result.qg_name = "check_analysis_approved"
result.note = "analysis-approval-gate"
if not (agent == "analyst" and work_item_id):
return
files_check = QG_CHECKS.get("check_analysis_complete")
if not files_check:
return
files_ok, _ = files_check(repo, work_item_id, branch)
if files_ok:
# Full artifacts ready -> In Review, ask for :approved:.
set_issue_in_review(work_item_id)
plane_add_comment(
work_item_id,
"\U0001f4cb BRD/\u0422\u0417/AC/TestPlan \u0433\u043e\u0442\u043e\u0432\u044b. "
"\u041f\u0440\u043e\u0448\u0443 review \u0438 \u0440\u0435\u0430\u043a\u0446\u0438\u044e :approved: "
"\u0434\u043b\u044f \u043f\u0440\u043e\u0434\u0432\u0438\u0436\u0435\u043d\u0438\u044f \u0432 Architecture.",
author="analyst",
)
notify_approve_requested(task_id)
result.note = "analysis-in-review"
logger.info(
f"Task {task_id}: analyst finished, requested :approved: in Plane"
)
return
questions_path = os.path.join(
get_worktree_path(repo, branch),
f"docs/work-items/{work_item_id}/01-questions.md",
)
if os.path.isfile(questions_path):
set_issue_needs_input(work_item_id)
with open(questions_path, "r") as qf:
questions_text = qf.read()
plane_add_comment(
work_item_id,
f"\u2753 Analyst \u043d\u0443\u0436\u0434\u0430\u0435\u0442\u0441\u044f \u0432 \u0443\u0442\u043e\u0447\u043d\u0435\u043d\u0438\u0438:\n\n{questions_text}",
author="analyst",
)
send_telegram(
f"\u2753 {work_item_id}: Analyst \u0437\u0430\u0434\u0430\u0451\u0442 \u0432\u043e\u043f\u0440\u043e\u0441\u044b. \u041e\u0442\u0432\u0435\u0442\u044c \u0432 Plane."
)
result.note = "analysis-needs-input"
return
# No artifacts and no questions.
plane_add_comment(
work_item_id,
"\u26a0\ufe0f Analyst \u0437\u0430\u0432\u0435\u0440\u0448\u0438\u043b\u0441\u044f \u0431\u0435\u0437 \u0430\u0440\u0442\u0435\u0444\u0430\u043a\u0442\u043e\u0432 \u0438 \u0431\u0435\u0437 \u0432\u043e\u043f\u0440\u043e\u0441\u043e\u0432. \u041f\u0440\u043e\u0432\u0435\u0440\u044c\u0442\u0435 \u043b\u043e\u0433.",
author="analyst",
)
result.note = "analysis-empty"
def _handle_qg_failure_rollbacks(
task_id, current_stage, repo, work_item_id, branch,
agent, qg_name, reason, result: AdvanceResult,
):
"""All rollback/retry branches from the original launcher, preserved verbatim.
Only fire on the launcher path (finished_agent is set). The webhook path
passes finished_agent=None, so none of these agent-specific branches trigger
— that matches the old plane behavior (it just reported the QG failure).
"""
# Reviewer REQUEST_CHANGES -> rollback to development + retry (max 3).
if agent == "reviewer" and "REQUEST_CHANGES" in (reason or ""):
update_task_stage(task_id, "development")
notify_stage_change(task_id, current_stage, "development")
plane_notify_stage(work_item_id, current_stage, "development")
result.rolled_back_to = "development"
retry_count = _developer_retry_count(task_id)
if retry_count < MAX_DEVELOPER_RETRIES:
task_desc = (
f"Work item: {work_item_id}\nRepo: {repo}\nBranch: {branch}\n"
f"Stage: development\nNote: REQUEST_CHANGES from reviewer "
f"(attempt {retry_count+1}/3). Fix findings in "
f"docs/work-items/{work_item_id}/12-review.md"
)
new_job = enqueue_job("developer", repo, task_desc, task_id=task_id)
result.enqueued_agent = "developer"
result.enqueued_job_id = new_job
logger.info(
f"Task {task_id}: reviewer REQUEST_CHANGES, enqueued developer "
f"(job_id={new_job})"
)
else:
send_telegram(
f"\u26a0\ufe0f {work_item_id}: Max developer retries (3) reached. "
f"Manual intervention needed."
)
result.alerted = True
logger.error(f"Task {task_id}: max retries reached")
# Tester check_tests_passed FAIL -> rollback to development + retry (max 3).
if agent == "tester" and qg_name == "check_tests_passed":
update_task_stage(task_id, "development")
notify_stage_change(task_id, current_stage, "development")
plane_notify_stage(work_item_id, current_stage, "development")
result.rolled_back_to = "development"
set_issue_in_progress(work_item_id)
plane_add_comment(
work_item_id,
f"\u274c \u0422\u0435\u0441\u0442\u044b \u043d\u0435 \u043f\u0440\u043e\u0448\u043b\u0438: {reason}. "
f"Developer \u043f\u0435\u0440\u0435\u0437\u0430\u043f\u0443\u0449\u0435\u043d \u0434\u043b\u044f \u0444\u0438\u043a\u0441\u0430.",
author="tester",
)
retry_count = _developer_retry_count(task_id)
if retry_count < MAX_DEVELOPER_RETRIES:
task_desc = (
f"Work item: {work_item_id}\nRepo: {repo}\nBranch: {branch}\n"
f"Stage: development\nNote: Tests FAILED. "
f"Fix failures described in docs/work-items/{work_item_id}/13-test-report.md"
)
new_job = enqueue_job("developer", repo, task_desc, task_id=task_id)
result.enqueued_agent = "developer"
result.enqueued_job_id = new_job
logger.info(
f"Task {task_id}: tester FAIL, enqueued developer (job_id={new_job})"
)
else:
set_issue_blocked(work_item_id)
send_telegram(
f"\U0001f6a8 {work_item_id}: Tests still failing after 3 developer "
f"retries. Manual intervention needed."
)
result.alerted = True
# Architect conflict (10-conflict.md exists) -> rollback to analysis.
if agent == "architect" and qg_name == "check_architecture_done":
conflict_path = os.path.join(
get_worktree_path(repo, branch),
f"docs/work-items/{work_item_id}/10-conflict.md",
)
if os.path.isfile(conflict_path):
update_task_stage(task_id, "analysis")
notify_stage_change(task_id, current_stage, "analysis")
plane_notify_stage(work_item_id, current_stage, "analysis")
result.rolled_back_to = "analysis"
set_issue_in_progress(work_item_id)
with open(conflict_path, "r") as cf:
conflict_text = cf.read()[:500]
plane_add_comment(
work_item_id,
f"\u26a0\ufe0f Architect \u043d\u0430\u0448\u0451\u043b \u043a\u043e\u043d\u0444\u043b\u0438\u043a\u0442 \u0441 \u0422\u0417. "
f"\u0412\u043e\u0437\u0432\u0440\u0430\u0442 \u0432 Analysis.\n\n{conflict_text}",
author="architect",
)
task_desc = (
f"Work item: {work_item_id}\nRepo: {repo}\nBranch: {branch}\n"
f"Stage: analysis\nNote: Architect conflict. Revise TRZ. "
f"See docs/work-items/{work_item_id}/10-conflict.md"
)
new_job = enqueue_job("analyst", repo, task_desc, task_id=task_id)
result.enqueued_agent = "analyst"
result.enqueued_job_id = new_job
logger.info(
f"Task {task_id}: architect conflict, enqueued analyst "
f"(job_id={new_job})"
)

View File

@@ -5,7 +5,7 @@ Stages:
Each stage defines:
- next: the stage to advance to
- agent: the agent to launch when entering the NEXT stage
- agent: the agent to launch when advancing FROM this stage (NOT the next stage's agent)
- qg: the quality gate check required to leave this stage
"""

268
src/usage.py Normal file
View File

@@ -0,0 +1,268 @@
"""Feature 4: token / cost accounting for agent runs.
claude --output-format json emits a single result JSON object at the end of the
run log with fields:
total_cost_usd
usage.input_tokens / output_tokens / cache_read_input_tokens /
cache_creation_input_tokens
modelUsage, num_turns, duration_ms
This module parses that JSON out of a (text-or-json) run log, records the usage
on the agent_runs row, formats a Plane comment for the finishing agent, and
builds the per-task summary the Deployer posts on deploy/done.
Everything here is defensive: a missing/garbled JSON never raises \u2014 we record
NULL/0 and log a warning so a broken agent run can't crash the monitor.
"""
import json
import logging
from .db import get_db
logger = logging.getLogger("orchestrator.usage")
def parse_usage_from_text(text: str) -> dict | None:
"""Extract the claude result-JSON usage from a run log's text.
The log may contain plain text before/after the JSON; with
--output-format json the JSON is the final object. We scan for the LAST
top-level '{' ... '}' that parses and carries usage/total_cost_usd.
Returns a normalised dict
{input_tokens, output_tokens, cache_read_tokens, cost_usd}
(ints / float, missing fields -> 0 / 0.0), or None if no usable JSON found.
"""
if not text:
return None
candidate = _extract_last_json_object(text)
if candidate is None:
return None
usage = candidate.get("usage") or {}
if not isinstance(usage, dict):
usage = {}
cost = candidate.get("total_cost_usd")
if cost is None:
cost = candidate.get("cost_usd")
# If there is neither a usage block nor a cost, this isn't a result object.
if not usage and cost is None:
return None
def _int(v):
try:
return int(v)
except (TypeError, ValueError):
return 0
def _float(v):
try:
return float(v)
except (TypeError, ValueError):
return 0.0
return {
"input_tokens": _int(usage.get("input_tokens")),
"output_tokens": _int(usage.get("output_tokens")),
"cache_read_tokens": _int(
usage.get("cache_read_input_tokens", usage.get("cache_read_tokens"))
),
"cost_usd": _float(cost),
}
def _extract_last_json_object(text: str) -> dict | None:
"""Return the last balanced top-level JSON object in `text` that parses.
Scans from the end for '}' and walks back to the matching '{' using a depth
counter (string-aware), trying json.loads on each candidate. Robust to log
lines or text emitted before the JSON.
"""
# Fast path: the whole stripped text is the JSON.
stripped = text.strip()
try:
obj = json.loads(stripped)
if isinstance(obj, dict):
return obj
except (ValueError, TypeError):
pass
# Otherwise find the last balanced { ... } block.
end = len(text)
while True:
close = text.rfind("}", 0, end)
if close == -1:
return None
depth = 0
in_str = False
esc = False
start = None
for i in range(close, -1, -1):
ch = text[i]
if in_str:
if esc:
esc = False
elif ch == "\\":
esc = True
elif ch == '"':
in_str = False
continue
if ch == '"':
in_str = True
elif ch == "}":
depth += 1
elif ch == "{":
depth -= 1
if depth == 0:
start = i
break
if start is not None:
blob = text[start:close + 1]
try:
obj = json.loads(blob)
if isinstance(obj, dict):
return obj
except (ValueError, TypeError):
pass
end = close # keep scanning earlier in the text
def parse_usage_from_log(path: str) -> dict | None:
"""Read a run log file and parse usage from it. Never raises."""
try:
with open(path, "r", encoding="utf-8", errors="replace") as f:
return parse_usage_from_text(f.read())
except OSError as e:
logger.warning(f"parse_usage_from_log: cannot read {path}: {e}")
return None
def record_usage(run_id: int, usage: dict | None):
"""Write parsed usage onto the agent_runs row. NULLs if usage is None."""
if usage is None:
logger.warning(f"run_id={run_id}: no usage JSON parsed, recording NULLs")
usage = {}
conn = get_db()
try:
conn.execute(
"UPDATE agent_runs SET input_tokens=?, output_tokens=?, "
"cache_read_tokens=?, cost_usd=? WHERE id=?",
(
usage.get("input_tokens"),
usage.get("output_tokens"),
usage.get("cache_read_tokens"),
usage.get("cost_usd"),
run_id,
),
)
conn.commit()
finally:
conn.close()
def fmt_tokens(n) -> str:
"""Format a token count compactly: 1234 -> '1.2k', 2_500_000 -> '2.5M'."""
try:
n = int(n or 0)
except (TypeError, ValueError):
n = 0
if n >= 1_000_000:
return f"{n / 1_000_000:.1f}M"
if n >= 1_000:
return f"{n / 1_000:.1f}k"
return str(n)
def fmt_cost(c) -> str:
"""Format USD cost with 2 decimals: '$0.21'."""
try:
c = float(c or 0.0)
except (TypeError, ValueError):
c = 0.0
return f"${c:.2f}"
# Pretty agent names for comments (mirrors STAGE_AUTHORS roles).
AGENT_DISPLAY = {
"analyst": "Analyst",
"architect": "Architect",
"developer": "Developer",
"reviewer": "Reviewer",
"tester": "Tester",
"deployer": "Deployer",
}
def usage_comment(agent: str, usage: dict | None) -> str:
"""Build the per-agent finish comment, e.g.
'\U0001f4bb Developer \u0433\u043e\u0442\u043e\u0432 \u00b7 45.2k in / 12.1k out \u00b7 $0.21'.
"""
usage = usage or {}
name = AGENT_DISPLAY.get(agent, agent.capitalize())
icon = AGENT_ICON.get(agent, "\u2705")
return (
f"{icon} {name} \u0433\u043e\u0442\u043e\u0432 \u00b7 "
f"{fmt_tokens(usage.get('input_tokens'))} in / "
f"{fmt_tokens(usage.get('output_tokens'))} out \u00b7 "
f"{fmt_cost(usage.get('cost_usd'))}"
)
AGENT_ICON = {
"analyst": "\U0001f50d",
"architect": "\U0001f4d0",
"developer": "\U0001f4bb",
"reviewer": "\U0001f50e",
"tester": "\U0001f9ea",
"deployer": "\U0001f680",
}
def task_usage_summary(task_id: int) -> dict:
"""Aggregate agent_runs usage for a task.
Returns {total_in, total_out, total_cost, per_agent: [(agent, in, out, cost), ...]}.
"""
conn = get_db()
try:
rows = conn.execute(
"SELECT agent, "
"COALESCE(SUM(input_tokens),0), "
"COALESCE(SUM(output_tokens),0), "
"COALESCE(SUM(cost_usd),0.0) "
"FROM agent_runs WHERE task_id=? GROUP BY agent ORDER BY agent",
(task_id,),
).fetchall()
finally:
conn.close()
per_agent = [(r[0], int(r[1]), int(r[2]), float(r[3])) for r in rows]
total_in = sum(r[1] for r in per_agent)
total_out = sum(r[2] for r in per_agent)
total_cost = sum(r[3] for r in per_agent)
return {
"total_in": total_in,
"total_out": total_out,
"total_cost": total_cost,
"per_agent": per_agent,
}
def task_summary_comment(task_id: int) -> str:
"""Build the Deployer end-of-task summary comment (Feature 4, variant B)."""
s = task_usage_summary(task_id)
lines = [
f"\U0001f4ca \u0418\u0442\u043e\u0433\u043e \u043f\u043e \u0437\u0430\u0434\u0430\u0447\u0435: "
f"{fmt_tokens(s['total_in'])} \u0442\u043e\u043a\u0435\u043d\u043e\u0432 \u0432\u0445\u043e\u0434 / "
f"{fmt_tokens(s['total_out'])} \u0432\u044b\u0445\u043e\u0434 \u00b7 "
f"{fmt_cost(s['total_cost'])}"
]
for agent, ti, to, cost in s["per_agent"]:
name = AGENT_DISPLAY.get(agent, agent.capitalize())
lines.append(
f"\u2022 {name}: {fmt_tokens(ti)} in / {fmt_tokens(to)} out \u00b7 {fmt_cost(cost)}"
)
return "\n".join(lines)

52
src/webhooks/_dedup.py Normal file
View File

@@ -0,0 +1,52 @@
"""ORCH-5 (M-7): webhook delivery de-duplication helper.
Webhook providers (Gitea/Plane) retry deliveries on timeout, network reset, or
manual replay. Without idempotency a retried delivery re-enters the pipeline and
spawns a duplicate run (the ET-009 incident class: parallel conveyors on one
repo). This module computes a stable per-delivery id so the webhook handlers can
INSERT-OR-IGNORE into events and skip the dispatch on a repeat.
delivery_id format: ``f"{source}:{raw_or_hash}"`` where source prefixes
gitea/plane so their id-spaces never collide. ``raw`` is the provider's native
delivery header (a GUID) when present; otherwise we fall back to a sha256 of the
body (a retried identical body yields the same hash).
"""
import hashlib
def _sha256_hex(*parts: str) -> str:
h = hashlib.sha256()
for p in parts:
h.update(p.encode("utf-8", "replace"))
return h.hexdigest()
def gitea_delivery_id(headers, event_type: str, body: bytes) -> str:
"""Compute the delivery_id for a Gitea webhook.
Prefers the ``X-Gitea-Delivery`` header (a per-delivery GUID). Falls back to
sha256(source + event_type + body) so a retried identical body still maps to
one id even if Gitea omitted the header.
"""
raw = (headers.get("X-Gitea-Delivery") or "").strip()
if not raw:
raw = _sha256_hex("gitea", event_type or "", body.decode("utf-8", "replace"))
return f"gitea:{raw}"
def plane_delivery_id(headers, body: bytes) -> str:
"""Compute the delivery_id for a Plane webhook.
Plane does not reliably send a delivery header, so we try a couple of common
names and otherwise fall back to sha256("plane" + body): a retried identical
body yields the same id.
"""
raw = (
headers.get("X-Plane-Delivery")
or headers.get("X-Hook-Delivery")
or ""
).strip()
if not raw:
raw = _sha256_hex("plane", body.decode("utf-8", "replace"))
return f"plane:{raw}"

View File

@@ -10,7 +10,14 @@ import httpx
from fastapi import APIRouter, Request, HTTPException
from ..config import settings
from ..db import get_db, get_task_by_repo_branch, update_task_stage, enqueue_job
from ..db import (
get_db,
get_task_by_repo_branch,
update_task_stage,
enqueue_job,
insert_event_dedup,
)
from ._dedup import gitea_delivery_id
from ..stages import get_next_stage, get_agent_for_stage
from ..qg.checks import check_ci_green, check_review_approved
from ..notifications import notify_stage_change, notify_qg_failure, notify_error
@@ -51,15 +58,17 @@ async def gitea_webhook(request: Request):
payload = json.loads(body)
# Log event
conn = get_db()
# ORCH-5 (M-7): idempotent logging. Compute a stable delivery_id (X-Gitea-Delivery
# GUID, or sha256 fallback) and INSERT OR IGNORE. A repeated delivery (Gitea retry
# / manual replay) returns inserted=False -> log + return {"status":"duplicate"}
# WITHOUT re-dispatching, so the pipeline is not re-triggered (ET-009 class).
# Runs AFTER HMAC verification above.
event_type = request.headers.get("X-Gitea-Event", "unknown")
conn.execute(
"INSERT INTO events (source, event_type, payload) VALUES (?, ?, ?)",
("gitea", event_type, body.decode()),
)
conn.commit()
conn.close()
delivery_id = gitea_delivery_id(request.headers, event_type, body)
inserted = insert_event_dedup("gitea", event_type, body.decode(), delivery_id)
if not inserted:
logger.info(f"Gitea webhook duplicate delivery_id={delivery_id}, skipping dispatch")
return {"status": "duplicate"}
if event_type == "push":
await handle_push(payload)

View File

@@ -15,7 +15,9 @@ from ..db import (
get_next_work_item_id,
update_task_stage,
enqueue_job,
insert_event_dedup,
)
from ._dedup import plane_delivery_id
from ..stages import get_next_stage, get_agent_for_stage, get_qg_for_stage, get_previous_stage
from ..qg.checks import QG_CHECKS
from ..notifications import notify_stage_change, notify_qg_failure, notify_error
@@ -61,14 +63,18 @@ async def plane_webhook(request: Request):
payload = json.loads(body)
# Log event
conn = get_db()
conn.execute(
"INSERT INTO events (source, event_type, payload) VALUES (?, ?, ?)",
("plane", payload.get("event", "unknown"), body.decode()),
)
conn.commit()
conn.close()
# ORCH-5 (M-7): idempotent logging. Plane rarely sends a delivery header, so the
# delivery_id falls back to sha256("plane" + body) (a retried identical body maps
# to one id). INSERT OR IGNORE; a duplicate returns inserted=False -> log + return
# {"status":"duplicate"} WITHOUT dispatching. Runs AFTER HMAC and BEFORE the ORCH-6
# project filter, so a repeat does no extra work; the FIRST delivery of an unknown
# project still falls through to the filter below and returns {"status":"ignored"}.
event_type = payload.get("event", "unknown")
delivery_id = plane_delivery_id(request.headers, body)
inserted = insert_event_dedup("plane", event_type, body.decode(), delivery_id)
if not inserted:
logger.info(f"Plane webhook duplicate delivery_id={delivery_id}, skipping dispatch")
return {"status": "duplicate"}
event = payload.get("event")
action = payload.get("action", "")
@@ -86,38 +92,139 @@ async def plane_webhook(request: Request):
return {"status": "ignored", "reason": "unknown project"}
if (event == "work_item.created") or (event == "issue" and action == "created"):
# Feature 1: creation NO LONGER starts the pipeline. Slava keeps the
# backlog until he moves an issue to In Progress. We only run a soft
# QG-0 sanity log here (no branch, no analyst, no task row).
await handle_work_item_created(data, project_id)
elif (event == "work_item.updated") or (event == "issue" and action == "updated"):
# Feature 1 & 2: status changes drive the pipeline.
# Backlog/Todo/Triage -> In Progress : START the pipeline (idempotent)
# -> Approved : advance (== :approved: comment)
# -> Rejected : rollback (== :rejected: comment)
await handle_issue_updated(data, project_id)
elif (event == "comment.created") or (event == "issue_comment" and action == "created"):
await handle_comment(data, project_id)
return {"status": "accepted"}
async def handle_work_item_created(data: dict, project_id: str = ""):
def _state_id(data: dict) -> str:
"""Extract the new Plane state UUID from an 'issue updated' payload.
Real payload (verified from prod events): data.state is
{id, name, color, group}. Some payloads carry state as a bare UUID string.
"""
New work item created in Plane.
QG-0: validate title, description, priority.
If valid: create branch, init docs, launch analyst.
If invalid: comment with what's missing, set Blocked.
state = data.get("state")
if isinstance(state, dict):
return state.get("id", "") or ""
if isinstance(state, str):
return state
return ""
async def handle_issue_updated(data: dict, project_id: str = ""):
"""Feature 1 & 2: react to a Plane issue status change.
Routes the NEW state UUID (data.state.id) to:
- in_progress : start the pipeline if this issue has no task yet
(idempotent — an existing task is NOT restarted; protects handle_comment
which also flips issues to In Progress during approve/answer flows).
- approved : same as a :approved: comment (advance current stage).
- rejected : same as a :rejected: comment (rollback + relaunch).
Any other status (Needs Input, In Review, Blocked, Done, board stages, etc.)
is ignored here — those are statuses the orchestrator itself sets.
"""
from ..plane_sync import PLANE_STATES
plane_id = str(data.get("id") or "")
new_state = _state_id(data)
if not plane_id or not new_state:
logger.info("issue updated without id/state, ignoring")
return
if new_state == PLANE_STATES["in_progress"]:
await handle_status_start(data, project_id)
elif new_state == PLANE_STATES["approved"]:
await handle_verdict(data, project_id, approved=True)
elif new_state == PLANE_STATES["rejected"]:
await handle_verdict(data, project_id, approved=False)
else:
logger.info(f"issue {plane_id} updated to state {new_state[:8]}..., no pipeline action")
async def handle_status_start(data: dict, project_id: str = ""):
"""Feature 1: an issue moved into In Progress -> start the pipeline.
Idempotent: if a task already exists for this plane_id, do nothing (no dup,
no analyst restart). This is what makes handle_comment's set_issue_in_progress
safe — by then the task already exists, so the start is skipped.
"""
plane_id = str(data.get("id") or "")
existing = get_task_by_plane_id(plane_id)
if existing:
logger.info(
f"Status->In Progress for {plane_id}: task already exists "
f"(stage={existing.get('stage')}), not restarting"
)
return
logger.info(f"Status->In Progress for {plane_id}: starting pipeline")
await start_pipeline(data, project_id)
async def handle_verdict(data: dict, project_id: str, approved: bool):
"""Feature 2 (variant B): a status verdict mirrors the comment verdicts.
Approved status == :approved: comment -> _try_advance_stage.
Rejected status == :rejected: comment -> rollback to previous stage + relaunch
(reason is unknown from a status change; Slava writes it in a separate
comment, so we pass a fixed note).
"""
plane_id = str(data.get("id") or "")
task = get_task_by_plane_id(plane_id)
if not task:
logger.warning(f"Verdict status for {plane_id} but no task found, ignoring")
return
task_id = task["id"]
current_stage = task["stage"]
repo = task["repo"]
work_item_id = task.get("work_item_id", "")
branch = task.get("branch", "")
if approved:
from ..plane_sync import set_issue_in_progress
set_issue_in_progress(work_item_id)
logger.info(f"Task {task_id}: Approved status -> advance from {current_stage}")
await _try_advance_stage(task_id, current_stage, repo, work_item_id, branch)
return
# Rejected: mirror the :rejected: comment rollback branch.
reason = "(rejected via status, see latest comment)"
await _rollback_stage(
task_id, current_stage, repo, work_item_id, branch, reason
)
async def handle_work_item_created(data: dict, project_id: str = ""):
"""Feature 1: creation does NOT start the pipeline anymore.
The pipeline is started when Slava moves the issue into In Progress
(handle_status_start -> start_pipeline). On creation we only run a SOFT QG-0
sanity check and log the result — NO branch, NO docs, NO analyst, NO task row
— so the issue can sit in the backlog until Slava is ready.
"""
plane_id = data.get("id", "")
name = data.get("name", "untitled")
description = data.get("description_stripped", data.get("description", ""))
priority = data.get("priority", {})
priority_name = priority if isinstance(priority, str) else priority.get("name", "")
errors = _qg0_errors(name, description)
if errors:
logger.info(f"work_item.created {plane_id}: soft QG-0 warnings: {errors}")
else:
logger.info(f"work_item.created {plane_id} ('{name}'): in backlog, awaiting In Progress")
# ORCH-6: resolve repo / prefix / Plane project from the registry instead of
# the single hardcoded default_repo.
if not project_id:
project_id = data.get("project") or data.get("project_id") or ""
proj = get_project_by_plane_id(project_id)
if not proj:
logger.warning(f"handle_work_item_created: unknown project '{project_id}', ignoring {plane_id}")
return
repo = proj.repo
plane_project_id = proj.plane_project_id
# QG-0 validation
def _qg0_errors(name: str, description: str) -> list:
"""QG-0 validation: returns a list of human-readable problems (empty = OK)."""
errors = []
if not name or len(name) < 5:
errors.append("Title \u0441\u043b\u0438\u0448\u043a\u043e\u043c \u043a\u043e\u0440\u043e\u0442\u043a\u0438\u0439 (\u043d\u0443\u0436\u043d\u043e >= 5 \u0441\u0438\u043c\u0432\u043e\u043b\u043e\u0432)")
@@ -126,6 +233,36 @@ async def handle_work_item_created(data: dict, project_id: str = ""):
if not description or len(description.strip()) < 20:
errors.append("Description \u0441\u043b\u0438\u0448\u043a\u043e\u043c \u043a\u043e\u0440\u043e\u0442\u043a\u0438\u0439 (\u043d\u0443\u0436\u043d\u043e >= 20 \u0441\u0438\u043c\u0432\u043e\u043b\u043e\u0432)")
return errors
async def start_pipeline(data: dict, project_id: str = ""):
"""Feature 1: start the pipeline for an issue (moved to In Progress).
This is the body extracted from the old handle_work_item_created: resolve the
project, run QG-0 (hard — blocks on failure), create the work item id +
branch + initial docs, insert the task row, and enqueue the analyst.
Callers (handle_status_start) already guarantee no existing task for this
plane_id, so this never duplicates.
"""
plane_id = data.get("id", "")
name = data.get("name", "untitled")
description = data.get("description_stripped", data.get("description", ""))
# ORCH-6: resolve repo / prefix / Plane project from the registry instead of
# the single hardcoded default_repo.
if not project_id:
project_id = data.get("project") or data.get("project_id") or ""
proj = get_project_by_plane_id(project_id)
if not proj:
logger.warning(f"start_pipeline: unknown project '{project_id}', ignoring {plane_id}")
return
repo = proj.repo
plane_project_id = proj.plane_project_id
# QG-0 validation (hard gate on pipeline start)
errors = _qg0_errors(name, description)
if errors:
# QG-0 failed
error_text = "\u26a0\ufe0f QG-0 failed:\n" + "\n".join(f"\u2022 {e}" for e in errors)
@@ -148,8 +285,20 @@ async def handle_work_item_created(data: dict, project_id: str = ""):
logger.info(f"QG-0 failed for {plane_id}: {errors}")
return
# Generate work item ID
work_item_id = get_next_work_item_id(repo, proj.work_item_prefix)
# Generate work item ID.
# M-6: source of truth for the number is the Plane sequence_id. Fetch it by
# issue UUID; if Plane is unavailable, fall back to the DB increment so a
# Plane outage never blocks task creation (autonomy > exact numbering).
from ..plane_sync import fetch_issue_sequence_id
seq = fetch_issue_sequence_id(plane_id, plane_project_id)
if seq is not None:
work_item_id = f"{proj.work_item_prefix}-{seq:03d}"
else:
work_item_id = get_next_work_item_id(repo, proj.work_item_prefix)
logger.warning(
f"Plane sequence_id unavailable for {plane_id}, "
f"fell back to DB increment: {work_item_id}"
)
# Create slug from name
slug = re.sub(r"[^a-z0-9]+", "-", name.lower()).strip("-")[:30]
@@ -191,7 +340,7 @@ async def handle_work_item_created(data: dict, project_id: str = ""):
logger.info(f"Task {task_id}: enqueued analyst (job_id={job_id})")
# Post start comment to Plane
from ..plane_sync import add_comment as _add_comment
_add_comment(work_item_id, "\U0001f50d Analyst \u0437\u0430\u043f\u0443\u0449\u0435\u043d. BRD/\u0422\u0417/AC/TestPlan \u0432 \u0440\u0430\u0431\u043e\u0442\u0435 (\u043e\u0436\u0438\u0434\u0430\u0439\u0442\u0435 8-15 \u043c\u0438\u043d).")
_add_comment(work_item_id, "\U0001f50d Analyst \u0437\u0430\u043f\u0443\u0449\u0435\u043d. BRD/\u0422\u0417/AC/TestPlan \u0432 \u0440\u0430\u0431\u043e\u0442\u0435 (\u043e\u0436\u0438\u0434\u0430\u0439\u0442\u0435 8-15 \u043c\u0438\u043d).", author="analyst")
except Exception as e:
logger.error(f"Failed to launch analyst for {work_item_id}: {e}")
@@ -222,32 +371,7 @@ async def handle_comment(data: dict, project_id: str = ""):
if ":rejected:" in comment_body:
# Extract reason (text after :rejected:)
reason = comment_body.split(":rejected:", 1)[-1].strip()[:300]
if current_stage == "analysis":
# Already in analysis — just relaunch analyst with rejection reason
from ..plane_sync import set_issue_in_progress
set_issue_in_progress(work_item_id)
task_desc = (
f"Work item: {work_item_id}\nRepo: {repo}\nBranch: {branch}\n"
f"Stage: analysis\nNote: Stakeholder REJECTED your artifacts. "
f"Reason: {reason}\nRevise and improve."
)
new_job = enqueue_job("analyst", repo, task_desc, task_id=task_id)
from ..plane_sync import add_comment as _plane_comment
_plane_comment(work_item_id, f"\U0001f504 Analyst \u043f\u0435\u0440\u0435\u0437\u0430\u043f\u0443\u0449\u0435\u043d. \u041f\u0440\u0438\u0447\u0438\u043d\u0430 \u043e\u0442\u043a\u043b\u043e\u043d\u0435\u043d\u0438\u044f: {reason}")
logger.info(f"Task {task_id}: rejected at analysis, enqueued analyst (job_id={new_job})")
else:
# Rollback to previous stage
prev_stage = get_previous_stage(current_stage)
if prev_stage:
update_task_stage(task_id, prev_stage)
from ..plane_sync import set_issue_in_progress
set_issue_in_progress(work_item_id)
notify_stage_change(task_id, current_stage, prev_stage)
plane_notify_stage(work_item_id, current_stage, prev_stage)
from ..plane_sync import add_comment as _plane_comment
_plane_comment(work_item_id, f"\U0001f504 \u041e\u0442\u043a\u0430\u0442: {current_stage} \u2192 {prev_stage}. \u041f\u0440\u0438\u0447\u0438\u043d\u0430: {reason}")
logger.info(f"Task {task_id}: rejected, rolled back {current_stage} \u2192 {prev_stage}")
await _rollback_stage(task_id, current_stage, repo, work_item_id, branch, reason)
return
if ":approved:" in comment_body:
@@ -292,7 +416,8 @@ async def handle_comment(data: dict, project_id: str = ""):
_pc(
work_item_id,
"\U0001f6a8 3 \u0440\u0430\u0443\u043d\u0434\u0430 \u0443\u0442\u043e\u0447\u043d\u0435\u043d\u0438\u0439 \u0438\u0441\u0447\u0435\u0440\u043f\u0430\u043d\u044b. Analyst \u043d\u0435 \u043c\u043e\u0436\u0435\u0442 \u0441\u0444\u043e\u0440\u043c\u0438\u0440\u043e\u0432\u0430\u0442\u044c \u0422\u0417. "
"\u0422\u0440\u0435\u0431\u0443\u0435\u0442\u0441\u044f \u0431\u043e\u043b\u0435\u0435 \u0434\u0435\u0442\u0430\u043b\u044c\u043d\u043e\u0435 \u043e\u043f\u0438\u0441\u0430\u043d\u0438\u0435 \u0438\u043b\u0438 \u0432\u0441\u0442\u0440\u0435\u0447\u0430."
"\u0422\u0440\u0435\u0431\u0443\u0435\u0442\u0441\u044f \u0431\u043e\u043b\u0435\u0435 \u0434\u0435\u0442\u0430\u043b\u044c\u043d\u043e\u0435 \u043e\u043f\u0438\u0441\u0430\u043d\u0438\u0435 \u0438\u043b\u0438 \u0432\u0441\u0442\u0440\u0435\u0447\u0430.",
author="analyst",
)
from ..notifications import send_telegram
send_telegram(f"\U0001f6a8 {work_item_id}: 3 \u0440\u0430\u0443\u043d\u0434\u0430 \u0432\u043e\u043f\u0440\u043e\u0441\u043e\u0432 analyst'\u0430 \u0438\u0441\u0447\u0435\u0440\u043f\u0430\u043d\u044b. \u041d\u0443\u0436\u043d\u0430 \u043f\u043e\u043c\u043e\u0449\u044c.")
@@ -308,91 +433,106 @@ async def handle_comment(data: dict, project_id: str = ""):
)
new_job = enqueue_job("analyst", repo, task_desc, task_id=task_id)
from ..plane_sync import add_comment as _pc2
_pc2(work_item_id, "\U0001f504 Analyst \u043f\u0435\u0440\u0435\u0437\u0430\u043f\u0443\u0449\u0435\u043d \u0441 \u043e\u0442\u0432\u0435\u0442\u0430\u043c\u0438 \u0441\u0442\u0435\u0439\u043a\u0445\u043e\u043b\u0434\u0435\u0440\u0430.")
_pc2(work_item_id, "\U0001f504 Analyst \u043f\u0435\u0440\u0435\u0437\u0430\u043f\u0443\u0449\u0435\u043d \u0441 \u043e\u0442\u0432\u0435\u0442\u0430\u043c\u0438 \u0441\u0442\u0435\u0439\u043a\u0445\u043e\u043b\u0434\u0435\u0440\u0430.", author="analyst")
logger.info(f"Task {task_id}: stakeholder answered questions, enqueued analyst (job_id={new_job})")
return
except Exception as e:
logger.error(f"Failed to check issue state: {e}")
async def _rollback_stage(
task_id: int, current_stage: str, repo: str, work_item_id: str, branch: str,
reason: str,
):
"""Shared :rejected: / Rejected-status rollback (Feature 2 variant B).
Both the :rejected: comment and a status change to Rejected funnel here so
the two mechanisms behave identically:
- at analysis: relaunch the analyst with the rejection reason;
- otherwise: roll back to the previous stage and relaunch its agent
(via the existing rollback notify + an enqueue of the prev-stage agent).
"""
if current_stage == "analysis":
# Already in analysis — just relaunch analyst with rejection reason
from ..plane_sync import set_issue_in_progress
set_issue_in_progress(work_item_id)
task_desc = (
f"Work item: {work_item_id}\nRepo: {repo}\nBranch: {branch}\n"
f"Stage: analysis\nNote: Stakeholder REJECTED your artifacts. "
f"Reason: {reason}\nRevise and improve."
)
new_job = enqueue_job("analyst", repo, task_desc, task_id=task_id)
from ..plane_sync import add_comment as _plane_comment
_plane_comment(work_item_id, f"\U0001f504 Analyst \u043f\u0435\u0440\u0435\u0437\u0430\u043f\u0443\u0449\u0435\u043d. \u041f\u0440\u0438\u0447\u0438\u043d\u0430 \u043e\u0442\u043a\u043b\u043e\u043d\u0435\u043d\u0438\u044f: {reason}", author="analyst")
logger.info(f"Task {task_id}: rejected at analysis, enqueued analyst (job_id={new_job})")
return
# Rollback to previous stage
prev_stage = get_previous_stage(current_stage)
if not prev_stage:
logger.info(f"Task {task_id}: rejected at {current_stage} but no previous stage")
return
update_task_stage(task_id, prev_stage)
notify_stage_change(task_id, current_stage, prev_stage)
# Feature 3: plane_notify_stage moves the board to the prev stage's status.
plane_notify_stage(work_item_id, current_stage, prev_stage)
# Then put it back to In Progress so the relaunched agent is clearly working.
from ..plane_sync import set_issue_in_progress
set_issue_in_progress(work_item_id)
from ..plane_sync import add_comment as _plane_comment, STAGE_AUTHORS
_plane_comment(
work_item_id,
f"\U0001f504 \u041e\u0442\u043a\u0430\u0442: {current_stage} \u2192 {prev_stage}. \u041f\u0440\u0438\u0447\u0438\u043d\u0430: {reason}",
author=STAGE_AUTHORS.get(prev_stage, "stream"),
)
# Relaunch the previous stage's agent so the rollback actually re-runs work.
# STAGE_AUTHORS maps a stage directly to the role that OWNS work in it
# (analysis->analyst, architecture->architect, ...), which is exactly the
# agent we must re-run on a rollback into prev_stage.
from ..plane_sync import STAGE_AUTHORS as _STAGE_AUTHORS
prev_agent = _STAGE_AUTHORS.get(prev_stage)
if prev_agent:
task_desc = (
f"Work item: {work_item_id}\nRepo: {repo}\nBranch: {branch}\n"
f"Stage: {prev_stage}\nNote: Stakeholder REJECTED. Reason: {reason}\n"
f"Revise and improve."
)
new_job = enqueue_job(prev_agent, repo, task_desc, task_id=task_id)
logger.info(
f"Task {task_id}: rejected, rolled back {current_stage} \u2192 {prev_stage}, "
f"enqueued {prev_agent} (job_id={new_job})"
)
else:
logger.info(f"Task {task_id}: rejected, rolled back {current_stage} \u2192 {prev_stage}")
async def _try_advance_stage(
task_id: int, current_stage: str, repo: str, work_item_id: str, branch: str
):
"""Run QG check for current stage and advance if passed."""
qg_name = get_qg_for_stage(current_stage)
next_stage = get_next_stage(current_stage)
"""Thin async wrapper over the unified stage engine (ORCH-4 / M-3).
if not next_stage:
logger.info(f"Task {task_id}: already at terminal stage '{current_stage}'")
return
The QG dispatch (including the check_review_approved PR-by-branch logic) and
the advance/launch logic now live in src/stage_engine.advance_stage(), which
is synchronous. We run it off the event loop via asyncio.to_thread so there
is exactly one implementation shared with the launcher.
# Run QG check if one is required
if qg_name:
qg_func = QG_CHECKS.get(qg_name)
if not qg_func:
logger.error(f"QG function '{qg_name}' not found in registry")
return
finished_agent is None on this webhook path (a human :approved: comment, not
a finished agent), so the agent-specific rollback branches inside the engine
intentionally do not trigger — identical to the old plane behavior, which
only ran the QG and either advanced or reported the failure.
"""
import asyncio
from ..stage_engine import advance_stage
# Determine args based on QG function
if qg_name in ("check_analysis_approved", "check_analysis_complete", "check_architecture_done", "check_tests_passed", "check_reviewer_verdict"):
# ORCH-2 / S-4: pass branch so artifacts are read from the task worktree.
passed, reason = qg_func(repo, work_item_id, branch)
elif qg_name in ("check_ci_green", "check_tests_local"):
passed, reason = qg_func(repo, branch)
elif qg_name == "check_review_approved":
# Find PR number by branch via Gitea API
import httpx as _httpx
from ..config import settings as _s
_owner = _s.gitea_owner
_url = f"{_s.gitea_url}/api/v1/repos/{_owner}/{repo}/pulls?state=open&limit=50"
_headers = {"Authorization": f"token {_s.gitea_token}"}
try:
_resp = _httpx.get(_url, headers=_headers, timeout=10)
_prs = _resp.json()
_pr_number = None
for _pr in _prs:
if _pr.get("head", {}).get("ref") == branch:
_pr_number = _pr["number"]
break
if _pr_number:
passed, reason = qg_func(repo, _pr_number)
else:
# No open PR but review file exists — check file-based
import os
from ..git_worktree import get_worktree_path as _gwp
_wt = _gwp(repo, branch) if os.path.isdir(_gwp(repo, branch)) else os.path.join(_s.repos_dir, repo)
_review_path = os.path.join(_wt, f"docs/work-items/{work_item_id}/12-review.md")
_review_path2 = os.path.join(_wt, f"docs/work-items/{work_item_id}/09-review.md")
if os.path.isfile(_review_path) or os.path.isfile(_review_path2):
passed, reason = True, "Review file exists (file-based approval)"
else:
passed, reason = False, "No open PR found and no review file"
except Exception as _e:
passed, reason = False, f"Error finding PR: {_e}"
else:
passed, reason = False, f"Unknown QG: {qg_name}"
if not passed:
notify_qg_failure(task_id, current_stage, qg_name, reason)
plane_notify_qg(work_item_id, current_stage, qg_name, reason)
return
# Advance stage
update_task_stage(task_id, next_stage)
notify_stage_change(task_id, current_stage, next_stage)
plane_notify_stage(work_item_id, current_stage, next_stage)
# Launch agent associated with the current stage's transition
agent = get_agent_for_stage(current_stage)
if agent:
try:
task_desc = f"Work item: {work_item_id}\nRepo: {repo}\nBranch: {branch}\nStage: {next_stage}"
job_id = enqueue_job(agent, repo, task_desc, task_id=task_id)
plane_notify_stage(work_item_id, current_stage, next_stage, agent)
logger.info(f"Task {task_id}: enqueued agent '{agent}', job_id={job_id}")
except Exception as e:
notify_error(task_id, f"Failed to launch agent '{agent}': {e}")
logger.error(f"Agent launch failed: {e}")
await asyncio.to_thread(
advance_stage,
task_id,
current_stage,
repo,
work_item_id,
branch,
None,
)
async def _create_gitea_branch(repo: str, branch: str):

40
tests/conftest.py Normal file
View File

@@ -0,0 +1,40 @@
"""Global pytest fixtures.
test(conftest): mute Telegram in ALL tests to stop prod leakage.
Background: a pytest run on prod was sending REAL Telegram messages to Slava,
because some tests (e.g. test_webhook_dedup advancing a stage) reach
notify_stage_change -> send_telegram, which reads the live .env
telegram_bot_token/chat_id and actually POSTs to Telegram.
This autouse fixture stubs send_telegram to a no-op for every test:
- "src.notifications.send_telegram" is the SOURCE. All the notify_* helpers in
notifications.py call the module-global send_telegram, and every other module
that does a *local* `from .notifications import send_telegram` inside a
function resolves it live at call time -> covered by patching the source.
- "src.stage_engine.send_telegram" is patched too, because stage_engine binds
send_telegram as a MODULE-LEVEL name (from .notifications import send_telegram
at import), so a patch of the source alone would not intercept its 3 direct
calls. webhooks/plane and launcher import it locally inside functions, so the
source patch already covers them; they are patched defensively with
raising=False anyway in case that ever changes.
raising=False so a module that doesn't (yet) expose the name never breaks setup.
"""
import pytest
@pytest.fixture(autouse=True)
def _no_telegram(monkeypatch):
_noop = lambda *a, **k: None # noqa: E731
# Source of truth (covers notifications.notify_* and all local re-imports).
monkeypatch.setattr("src.notifications.send_telegram", _noop, raising=False)
# Module-level binding in stage_engine (and defensive coverage elsewhere).
monkeypatch.setattr("src.stage_engine.send_telegram", _noop, raising=False)
monkeypatch.setattr("src.webhooks.plane.send_telegram", _noop, raising=False)
monkeypatch.setattr("src.agents.launcher.send_telegram", _noop, raising=False)
monkeypatch.setattr("src.queue_worker.send_telegram", _noop, raising=False)
yield

View File

@@ -0,0 +1,92 @@
"""L-2: tests for prune_run_logs (run-log rotation).
Verifies that old / surplus *.log files are removed while fresh logs, non-.log
files, the active log, and subdirectories are left intact. Function is
best-effort and must never raise.
"""
import os
import time
from src.agents.launcher import prune_run_logs
def _touch(path, age_days=0):
with open(path, "w") as f:
f.write("x")
mtime = time.time() - age_days * 86400
os.utime(path, (mtime, mtime))
return path
def test_old_logs_removed_fresh_kept(tmp_path):
runs = tmp_path
fresh = _touch(str(runs / "1.log"), age_days=1)
old = _touch(str(runs / "2.log"), age_days=40)
removed = prune_run_logs(str(runs), keep_days=30, keep_max=500)
assert removed == 1
assert os.path.exists(fresh)
assert not os.path.exists(old)
def test_non_log_files_untouched(tmp_path):
runs = tmp_path
old_log = _touch(str(runs / "stale.log"), age_days=99)
keep_txt = _touch(str(runs / "notes.txt"), age_days=99)
keep_db = _touch(str(runs / "orchestrator.db"), age_days=99)
prune_run_logs(str(runs), keep_days=30, keep_max=500)
assert not os.path.exists(old_log)
assert os.path.exists(keep_txt)
assert os.path.exists(keep_db)
def test_keep_max_retains_newest(tmp_path):
runs = tmp_path
# 5 logs, all recent (within keep_days), increasing age 0..4 days.
paths = []
for i in range(5):
paths.append(_touch(str(runs / f"{i}.log"), age_days=i))
removed = prune_run_logs(str(runs), keep_days=365, keep_max=2)
# Only the 2 newest (age 0, 1) survive.
assert removed == 3
assert os.path.exists(paths[0])
assert os.path.exists(paths[1])
for p in paths[2:]:
assert not os.path.exists(p)
def test_active_log_never_removed(tmp_path):
runs = tmp_path
active = _touch(str(runs / "active.log"), age_days=99)
other = _touch(str(runs / "other.log"), age_days=99)
removed = prune_run_logs(
str(runs), keep_days=30, keep_max=500, active_paths=[active]
)
assert removed == 1
assert os.path.exists(active)
assert not os.path.exists(other)
def test_subdirs_untouched(tmp_path):
runs = tmp_path
sub = runs / "sub.log"
sub.mkdir() # a directory that happens to end in .log
old_log = _touch(str(runs / "old.log"), age_days=99)
prune_run_logs(str(runs), keep_days=30, keep_max=500)
assert sub.is_dir()
assert not os.path.exists(old_log)
def test_missing_dir_is_noop(tmp_path):
missing = tmp_path / "does-not-exist"
# Must not raise.
assert prune_run_logs(str(missing)) == 0

187
tests/test_m6_sequence.py Normal file
View File

@@ -0,0 +1,187 @@
"""M-6: work_item_id derived from Plane sequence_id (source of truth = Plane).
Covers:
* fetch_issue_sequence_id returns int on a valid Plane response (mocked httpx);
* returns None on network error / missing field WITHOUT raising;
* handle_work_item_created uses prefix-NNN when seq is available, and falls
back to get_next_work_item_id when seq is None (Plane down => autonomy);
* find_issue_id no longer hardcodes 'ET-' and matches an arbitrary prefix
(e.g. ORCH-005) by sequence_id.
"""
import os
import tempfile
import pytest
_test_db = os.path.join(tempfile.gettempdir(), "test_orchestrator_m6.db")
os.environ["ORCH_DB_PATH"] = _test_db
os.environ.setdefault("ORCH_PLANE_WEBHOOK_SECRET", "")
os.environ.setdefault("ORCH_GITEA_WEBHOOK_SECRET", "")
os.environ.setdefault("ORCH_GITEA_TOKEN", "test-token")
os.environ.setdefault("ORCH_PLANE_API_TOKEN", "test-token")
from unittest.mock import patch, AsyncMock, MagicMock # noqa: E402
from fastapi.testclient import TestClient # noqa: E402
from src.main import app # noqa: E402
from src.db import init_db, get_db # noqa: E402
from src import projects as P # noqa: E402
from src.projects import reload_projects # noqa: E402
import src.plane_sync as plane_sync # noqa: E402
ORCH_PLANE_ID = "8da6aa25-a60e-44d6-a1e2-d8ae59aa7d6a"
ENDURO_PLANE_ID = "7a79f0a9-5278-49cd-9007-9a338f238f9c"
client = TestClient(app)
@pytest.fixture(autouse=True)
def setup(monkeypatch):
monkeypatch.setattr(P.settings, "db_path", _test_db)
import src.db as _db
monkeypatch.setattr(_db.settings, "db_path", _test_db)
if os.path.exists(_test_db):
os.unlink(_test_db)
init_db()
monkeypatch.setattr("src.webhooks.plane.verify_plane_signature", lambda body, sig: True)
registry_json = (
f'[{{"plane_project_id": "{ENDURO_PLANE_ID}", "repo": "enduro-trails",'
f' "work_item_prefix": "ET", "name": "enduro-trails"}},'
f' {{"plane_project_id": "{ORCH_PLANE_ID}", "repo": "orchestrator",'
f' "work_item_prefix": "ORCH", "name": "orchestrator"}}]'
)
monkeypatch.setattr(P.settings, "projects_json", registry_json)
reload_projects()
yield
reload_projects()
if os.path.exists(_test_db):
os.unlink(_test_db)
def _mock_resp(json_body, status=200):
m = MagicMock()
m.json.return_value = json_body
m.raise_for_status.return_value = None
if status >= 400:
def _raise():
raise RuntimeError(f"HTTP {status}")
m.raise_for_status.side_effect = _raise
return m
# ---------------------------------------------------------------------------
# fetch_issue_sequence_id
# ---------------------------------------------------------------------------
def test_fetch_sequence_id_returns_int():
with patch.object(plane_sync.httpx, "get", return_value=_mock_resp({"sequence_id": 42})):
seq = plane_sync.fetch_issue_sequence_id("issue-uuid", "proj-uuid")
assert seq == 42
assert isinstance(seq, int)
def test_fetch_sequence_id_network_error_returns_none():
with patch.object(plane_sync.httpx, "get", side_effect=RuntimeError("connection refused")):
seq = plane_sync.fetch_issue_sequence_id("issue-uuid", "proj-uuid")
assert seq is None # must not raise
def test_fetch_sequence_id_missing_field_returns_none():
with patch.object(plane_sync.httpx, "get", return_value=_mock_resp({"error": "not found"})):
seq = plane_sync.fetch_issue_sequence_id("missing-uuid", "proj-uuid")
assert seq is None
# ---------------------------------------------------------------------------
# handle_work_item_created: seq available -> prefix-NNN
# ---------------------------------------------------------------------------
# Feature 1: pipeline starts on a status change to In Progress, not on creation.
_IN_PROGRESS = "b873d9eb-993c-48cd-97ac-99a9b1623967"
def _post(plane_id, plane_project_id=ORCH_PLANE_ID, name="A valid work item title"):
return client.post(
"/webhook/plane",
json={
"event": "issue",
"action": "updated",
"data": {
"id": plane_id,
"name": name,
"description_stripped": "This is a sufficiently long description.",
"project": plane_project_id,
"state": {"id": _IN_PROGRESS, "name": "In Progress", "group": "started"},
},
},
)
@patch("src.webhooks.plane.launcher")
@patch("src.webhooks.plane._create_initial_docs", new_callable=AsyncMock)
@patch("src.webhooks.plane._create_gitea_branch", new_callable=AsyncMock)
@patch("src.plane_sync.fetch_issue_sequence_id", return_value=7)
def test_created_uses_plane_sequence_id(mock_fetch, mock_branch, mock_docs, mock_launcher):
mock_launcher.launch.return_value = 1
resp = _post("seq-issue")
assert resp.status_code == 200
conn = get_db()
task = conn.execute("SELECT work_item_id FROM tasks WHERE plane_id='seq-issue'").fetchone()
conn.close()
assert task is not None
assert task["work_item_id"] == "ORCH-007"
mock_fetch.assert_called_once()
@patch("src.webhooks.plane.launcher")
@patch("src.webhooks.plane._create_initial_docs", new_callable=AsyncMock)
@patch("src.webhooks.plane._create_gitea_branch", new_callable=AsyncMock)
@patch("src.plane_sync.fetch_issue_sequence_id", return_value=None)
@patch("src.webhooks.plane.get_next_work_item_id", return_value="ORCH-099")
def test_created_falls_back_to_db_when_plane_down(
mock_next, mock_fetch, mock_branch, mock_docs, mock_launcher
):
"""Plane unavailable (seq=None) => fall back to DB increment; task still created."""
mock_launcher.launch.return_value = 1
resp = _post("fallback-issue")
assert resp.status_code == 200
conn = get_db()
task = conn.execute("SELECT work_item_id FROM tasks WHERE plane_id='fallback-issue'").fetchone()
conn.close()
assert task is not None # autonomy: Plane down does not block creation
assert task["work_item_id"] == "ORCH-099"
mock_next.assert_called_once()
# ---------------------------------------------------------------------------
# find_issue_id: no hardcoded ET- prefix, matches arbitrary prefix by seq
# ---------------------------------------------------------------------------
def test_find_issue_id_matches_arbitrary_prefix_by_sequence():
"""ORCH-005 must resolve via the issue whose sequence_id == 5 (no ET- assumption)."""
issues = {"results": [
{"id": "uuid-a", "sequence_id": 3, "name": "something"},
{"id": "uuid-b", "sequence_id": 5, "name": "ORCH-005: target"},
{"id": "uuid-c", "sequence_id": 9, "name": "other"},
]}
# No DB row for this work_item_id => goes to the Plane API search branch.
with patch.object(plane_sync.httpx, "get", return_value=_mock_resp(issues)):
found = plane_sync.find_issue_id("ORCH-005", project_id="proj-uuid")
assert found == "uuid-b"
def test_find_issue_id_matches_et_prefix_too():
"""Backward compat: ET-002 still resolves by sequence_id == 2."""
issues = {"results": [
{"id": "uuid-x", "sequence_id": 2, "name": "ET item"},
{"id": "uuid-y", "sequence_id": 7, "name": "other"},
]}
with patch.object(plane_sync.httpx, "get", return_value=_mock_resp(issues)):
found = plane_sync.find_issue_id("ET-002", project_id="proj-uuid")
assert found == "uuid-x"

View File

@@ -0,0 +1,99 @@
"""Tests for per-agent Plane comment authorship (feat: per-agent bot author).
Covers:
* _headers_for: role -> bot token; None/unknown/empty token -> shared fallback.
* add_comment: author is propagated into the POST headers; no author keeps
backward-compatible behaviour (shared orchestrator token).
GET/PATCH calls are intentionally NOT covered here: they stay on the shared
token by design and are unchanged by this feature.
"""
import os
# Set env defaults before importing app modules (same convention as the other
# suites) so config/settings load cleanly without a real .env.
os.environ.setdefault("ORCH_PLANE_API_TOKEN", "shared-token")
os.environ.setdefault("ORCH_GITEA_TOKEN", "test-token")
from unittest.mock import patch, MagicMock # noqa: E402
from src import plane_sync # noqa: E402
# --------------------------------------------------------------------------- #
# _headers_for
# --------------------------------------------------------------------------- #
def test_headers_for_known_role_uses_bot_token():
"""A known role with a configured token -> that bot's X-API-Key."""
with patch.dict(plane_sync.PLANE_BOT_TOKENS, {"analyst": "analyst-tok"}, clear=False):
assert plane_sync._headers_for("analyst") == {"X-API-Key": "analyst-tok"}
def test_headers_for_none_falls_back_to_shared():
"""author=None -> shared orchestrator headers."""
assert plane_sync._headers_for(None) is plane_sync.PLANE_HEADERS
def test_headers_for_unknown_role_falls_back_to_shared():
"""Unknown role -> shared orchestrator headers."""
assert plane_sync._headers_for("nope") is plane_sync.PLANE_HEADERS
def test_headers_for_empty_token_falls_back_to_shared():
"""Known role but empty/unconfigured token -> shared orchestrator headers."""
with patch.dict(plane_sync.PLANE_BOT_TOKENS, {"tester": ""}, clear=False):
assert plane_sync._headers_for("tester") is plane_sync.PLANE_HEADERS
def test_headers_for_empty_string_author_falls_back_to_shared():
"""author='' -> shared orchestrator headers."""
assert plane_sync._headers_for("") is plane_sync.PLANE_HEADERS
# --------------------------------------------------------------------------- #
# add_comment
# --------------------------------------------------------------------------- #
def _mock_post_ok():
resp = MagicMock()
resp.raise_for_status.return_value = None
return resp
def test_add_comment_with_author_posts_with_bot_headers():
"""add_comment(author='developer') -> httpx.post called with the developer
bot's X-API-Key header."""
with patch.object(plane_sync, "find_issue_id", return_value="issue-uuid"), \
patch.object(plane_sync, "_resolve_project_id", return_value="proj-uuid"), \
patch.dict(plane_sync.PLANE_BOT_TOKENS, {"developer": "dev-tok"}, clear=False), \
patch.object(plane_sync.httpx, "post", return_value=_mock_post_ok()) as mock_post:
plane_sync.add_comment("ET-001", "hello", author="developer")
assert mock_post.called
_, kwargs = mock_post.call_args
assert kwargs["headers"] == {"X-API-Key": "dev-tok"}
def test_add_comment_without_author_uses_shared_token():
"""add_comment without author -> shared orchestrator headers (backward
compatible)."""
with patch.object(plane_sync, "find_issue_id", return_value="issue-uuid"), \
patch.object(plane_sync, "_resolve_project_id", return_value="proj-uuid"), \
patch.object(plane_sync.httpx, "post", return_value=_mock_post_ok()) as mock_post:
plane_sync.add_comment("ET-001", "hello")
assert mock_post.called
_, kwargs = mock_post.call_args
assert kwargs["headers"] is plane_sync.PLANE_HEADERS
def test_add_comment_unknown_author_uses_shared_token():
"""add_comment with an unknown role -> shared orchestrator headers."""
with patch.object(plane_sync, "find_issue_id", return_value="issue-uuid"), \
patch.object(plane_sync, "_resolve_project_id", return_value="proj-uuid"), \
patch.object(plane_sync.httpx, "post", return_value=_mock_post_ok()) as mock_post:
plane_sync.add_comment("ET-001", "hello", author="ghost")
assert mock_post.called
_, kwargs = mock_post.call_args
assert kwargs["headers"] is plane_sync.PLANE_HEADERS

View File

@@ -73,16 +73,24 @@ def setup(monkeypatch):
os.unlink(_test_db)
# Feature 1: the pipeline now starts on a status change to In Progress (not on
# creation). _post_created drives that status-change event so these ORCH-6
# routing tests still exercise task creation through the new trigger.
_IN_PROGRESS = "b873d9eb-993c-48cd-97ac-99a9b1623967"
def _post_created(plane_project_id, plane_id="wi-1", name="A valid work item title"):
return client.post(
"/webhook/plane",
json={
"event": "work_item.created",
"event": "issue",
"action": "updated",
"data": {
"id": plane_id,
"name": name,
"description_stripped": "This is a sufficiently long description.",
"project": plane_project_id,
"state": {"id": _IN_PROGRESS, "name": "In Progress", "group": "started"},
},
},
)

395
tests/test_stage_engine.py Normal file
View File

@@ -0,0 +1,395 @@
"""ORCH-4 / M-3: tests for the unified stage engine (src/stage_engine.advance_stage).
These verify the MERGED behavior of what used to be two diverged
_try_advance_stage implementations (launcher sync + plane async):
* happy-path advance for every stage launches the CORRECT agent
(the ORCH-4 fix: agent = get_agent_for_stage(current_stage), NOT next_stage);
* a QG failure does not advance;
* reviewer REQUEST_CHANGES -> rollback to development + enqueue developer;
* developer retries > 3 -> telegram alert, no further enqueue;
* tester FAIL -> rollback to development + enqueue developer;
* architect conflict (10-conflict.md) -> rollback to analysis + enqueue analyst;
* launcher AND plane both delegate to the engine.
Network/Plane/Telegram side effects are mocked at the src.stage_engine level so
the engine runs against a real isolated sqlite DB.
"""
import os
import tempfile
import pytest
# Isolated test DB (same convention as the other suites).
_test_db = os.path.join(tempfile.gettempdir(), "test_orchestrator_stage_engine.db")
os.environ["ORCH_DB_PATH"] = _test_db
os.environ["ORCH_REPOS_DIR"] = tempfile.gettempdir()
os.environ.setdefault("ORCH_GITEA_TOKEN", "test-token")
os.environ.setdefault("ORCH_PLANE_API_TOKEN", "test-token")
from unittest.mock import MagicMock, patch # noqa: E402
import src.db as _db # noqa: E402
from src.db import init_db, get_db # noqa: E402
from src import stage_engine # noqa: E402
from src.stage_engine import advance_stage # noqa: E402
from src.stages import get_agent_for_stage # noqa: E402
# ---------------------------------------------------------------------------
# Fixtures
# ---------------------------------------------------------------------------
@pytest.fixture(autouse=True)
def fresh_db(monkeypatch):
"""Fresh isolated DB per test."""
monkeypatch.setattr(_db.settings, "db_path", _test_db)
if os.path.exists(_test_db):
os.unlink(_test_db)
init_db()
yield
@pytest.fixture(autouse=True)
def silence_side_effects(monkeypatch):
"""Mock all Plane/Telegram/notification side effects in the engine.
Everything imported into src.stage_engine that touches the network or sends
a message becomes a no-op MagicMock so tests are deterministic and offline.
"""
for name in (
"notify_stage_change",
"notify_qg_failure",
"notify_approve_requested",
"send_telegram",
"plane_notify_stage",
"plane_notify_qg",
"plane_add_comment",
"set_issue_in_review",
"set_issue_needs_input",
"set_issue_in_progress",
"set_issue_blocked",
):
monkeypatch.setattr(stage_engine, name, MagicMock())
def _make_task(stage, repo="enduro-trails", branch="feature/ET-001-x", wi="ET-001"):
conn = get_db()
cur = conn.execute(
"INSERT INTO tasks (plane_id, work_item_id, repo, branch, stage) "
"VALUES (?, ?, ?, ?, ?)",
(f"plane-{wi}", wi, repo, branch, stage),
)
task_id = cur.lastrowid
conn.commit()
conn.close()
return task_id
def _stage(task_id):
conn = get_db()
row = conn.execute("SELECT stage FROM tasks WHERE id=?", (task_id,)).fetchone()
conn.close()
return row[0]
def _jobs():
conn = get_db()
rows = conn.execute("SELECT agent, repo, task_id FROM jobs ORDER BY id").fetchall()
conn.close()
return [dict(r) for r in rows]
def _add_developer_runs(task_id, n):
conn = get_db()
for _ in range(n):
conn.execute(
"INSERT INTO agent_runs (task_id, agent) VALUES (?, 'developer')",
(task_id,),
)
conn.commit()
conn.close()
def _pass(*a, **k):
return (True, "ok")
def _fail(reason):
def _f(*a, **k):
return (False, reason)
return _f
# ---------------------------------------------------------------------------
# Happy path: each stage advances and launches the CORRECT agent (ORCH-4 fix)
# ---------------------------------------------------------------------------
class TestHappyPathAgentSelection:
"""The fixed agent-selection: when advancing FROM current_stage, the engine
must enqueue get_agent_for_stage(current_stage), NOT next_stage.
"""
@pytest.mark.parametrize(
"current_stage,expected_next,expected_agent",
[
("architecture", "development", "developer"),
("development", "review", "reviewer"),
("review", "testing", "tester"),
("testing", "deploy", "deployer"),
],
)
def test_advance_launches_current_stage_agent(
self, monkeypatch, current_stage, expected_next, expected_agent
):
# All QG checks pass for this happy-path suite.
monkeypatch.setattr(
stage_engine, "QG_CHECKS",
{k: _pass for k in stage_engine.QG_CHECKS},
)
task_id = _make_task(current_stage)
res = advance_stage(
task_id, current_stage, "enduro-trails", "ET-001",
"feature/ET-001-x", finished_agent=None,
)
assert res.advanced is True
assert res.to_stage == expected_next
assert _stage(task_id) == expected_next
# The ORCH-4 fix: correct agent == get_agent_for_stage(current_stage).
assert expected_agent == get_agent_for_stage(current_stage)
assert res.enqueued_agent == expected_agent
jobs = _jobs()
assert len(jobs) == 1
assert jobs[0]["agent"] == expected_agent
def test_deploy_to_done_no_agent(self, monkeypatch):
"""deploy -> done advances but launches no agent (terminal-ish)."""
monkeypatch.setattr(
stage_engine, "QG_CHECKS",
{k: _pass for k in stage_engine.QG_CHECKS},
)
task_id = _make_task("deploy")
res = advance_stage(task_id, "deploy", "enduro-trails", "ET-001",
"feature/ET-001-x", finished_agent=None)
assert res.advanced is True
assert _stage(task_id) == "done"
assert res.enqueued_agent is None
assert _jobs() == []
def test_done_is_terminal(self):
task_id = _make_task("done")
res = advance_stage(task_id, "done", "enduro-trails", "ET-001",
"feature/ET-001-x", finished_agent=None)
assert res.advanced is False
assert _stage(task_id) == "done"
# ---------------------------------------------------------------------------
# QG failure: do not advance
# ---------------------------------------------------------------------------
class TestQgFailureDoesNotAdvance:
def test_qg_fail_keeps_stage(self, monkeypatch):
monkeypatch.setattr(
stage_engine, "QG_CHECKS",
{**stage_engine.QG_CHECKS, "check_architecture_done": _fail("not done")},
)
task_id = _make_task("architecture")
res = advance_stage(task_id, "architecture", "enduro-trails", "ET-001",
"feature/ET-001-x", finished_agent="architect")
assert res.advanced is False
assert res.qg_passed is False
assert _stage(task_id) == "architecture"
assert _jobs() == []
def test_webhook_path_emits_qg_failure_notification(self, monkeypatch):
"""finished_agent=None -> generic QG-failure notification fires (plane parity)."""
monkeypatch.setattr(
stage_engine, "QG_CHECKS",
{**stage_engine.QG_CHECKS, "check_tests_local": _fail("ci red")},
)
task_id = _make_task("development")
advance_stage(task_id, "development", "enduro-trails", "ET-001",
"feature/ET-001-x", finished_agent=None)
assert stage_engine.notify_qg_failure.called
assert stage_engine.plane_notify_qg.called
def test_launcher_path_no_generic_qg_notification(self, monkeypatch):
"""finished_agent set -> NO generic QG notification (launcher parity)."""
monkeypatch.setattr(
stage_engine, "QG_CHECKS",
{**stage_engine.QG_CHECKS, "check_architecture_done": _fail("not done")},
)
task_id = _make_task("architecture")
advance_stage(task_id, "architecture", "enduro-trails", "ET-001",
"feature/ET-001-x", finished_agent="architect")
assert not stage_engine.notify_qg_failure.called
# ---------------------------------------------------------------------------
# Reviewer REQUEST_CHANGES -> rollback to development + enqueue developer
# ---------------------------------------------------------------------------
class TestReviewerRequestChanges:
def test_rollback_and_enqueue_developer(self, monkeypatch):
monkeypatch.setattr(
stage_engine, "QG_CHECKS",
{**stage_engine.QG_CHECKS,
"check_reviewer_verdict": _fail("verdict: REQUEST_CHANGES")},
)
task_id = _make_task("review")
res = advance_stage(task_id, "review", "enduro-trails", "ET-001",
"feature/ET-001-x", finished_agent="reviewer")
assert res.advanced is False
assert res.rolled_back_to == "development"
assert _stage(task_id) == "development"
jobs = _jobs()
assert len(jobs) == 1
assert jobs[0]["agent"] == "developer"
def test_retry_over_3_alerts_no_enqueue(self, monkeypatch):
monkeypatch.setattr(
stage_engine, "QG_CHECKS",
{**stage_engine.QG_CHECKS,
"check_reviewer_verdict": _fail("verdict: REQUEST_CHANGES")},
)
task_id = _make_task("review")
_add_developer_runs(task_id, 3) # already at the max
res = advance_stage(task_id, "review", "enduro-trails", "ET-001",
"feature/ET-001-x", finished_agent="reviewer")
assert res.rolled_back_to == "development"
assert res.alerted is True
assert stage_engine.send_telegram.called
# No new developer job enqueued past the retry cap.
assert _jobs() == []
# ---------------------------------------------------------------------------
# Tester FAIL -> rollback to development + enqueue developer
# ---------------------------------------------------------------------------
class TestTesterFail:
def test_rollback_and_enqueue_developer(self, monkeypatch):
monkeypatch.setattr(
stage_engine, "QG_CHECKS",
{**stage_engine.QG_CHECKS, "check_tests_passed": _fail("2 tests failed")},
)
task_id = _make_task("testing")
res = advance_stage(task_id, "testing", "enduro-trails", "ET-001",
"feature/ET-001-x", finished_agent="tester")
assert res.advanced is False
assert res.rolled_back_to == "development"
assert _stage(task_id) == "development"
jobs = _jobs()
assert len(jobs) == 1
assert jobs[0]["agent"] == "developer"
def test_retry_over_3_blocks_and_alerts(self, monkeypatch):
monkeypatch.setattr(
stage_engine, "QG_CHECKS",
{**stage_engine.QG_CHECKS, "check_tests_passed": _fail("still failing")},
)
task_id = _make_task("testing")
_add_developer_runs(task_id, 3)
res = advance_stage(task_id, "testing", "enduro-trails", "ET-001",
"feature/ET-001-x", finished_agent="tester")
assert res.rolled_back_to == "development"
assert res.alerted is True
assert stage_engine.set_issue_blocked.called
assert _jobs() == []
# ---------------------------------------------------------------------------
# Architect conflict -> rollback to analysis + enqueue analyst
# ---------------------------------------------------------------------------
class TestArchitectConflict:
def test_conflict_rolls_back_to_analysis(self, monkeypatch, tmp_path):
# 10-conflict.md must exist in the worktree path the engine inspects.
wt = tmp_path / "wt"
conflict_dir = wt / "docs" / "work-items" / "ET-001"
conflict_dir.mkdir(parents=True)
(conflict_dir / "10-conflict.md").write_text("conflict with TRZ")
monkeypatch.setattr(stage_engine, "get_worktree_path", lambda repo, branch: str(wt))
monkeypatch.setattr(
stage_engine, "QG_CHECKS",
{**stage_engine.QG_CHECKS, "check_architecture_done": _fail("conflict")},
)
task_id = _make_task("architecture")
res = advance_stage(task_id, "architecture", "enduro-trails", "ET-001",
"feature/ET-001-x", finished_agent="architect")
assert res.advanced is False
assert res.rolled_back_to == "analysis"
assert _stage(task_id) == "analysis"
jobs = _jobs()
assert len(jobs) == 1
assert jobs[0]["agent"] == "analyst"
def test_no_conflict_file_no_rollback(self, monkeypatch, tmp_path):
wt = tmp_path / "wt"
(wt / "docs").mkdir(parents=True)
monkeypatch.setattr(stage_engine, "get_worktree_path", lambda repo, branch: str(wt))
monkeypatch.setattr(
stage_engine, "QG_CHECKS",
{**stage_engine.QG_CHECKS, "check_architecture_done": _fail("incomplete")},
)
task_id = _make_task("architecture")
res = advance_stage(task_id, "architecture", "enduro-trails", "ET-001",
"feature/ET-001-x", finished_agent="architect")
assert res.advanced is False
assert res.rolled_back_to is None
assert _stage(task_id) == "architecture"
assert _jobs() == []
# ---------------------------------------------------------------------------
# Analyst approved-flow (analysis gate): never auto-advances
# ---------------------------------------------------------------------------
class TestAnalysisApprovedFlow:
def test_artifacts_ready_requests_approval_no_advance(self, monkeypatch):
monkeypatch.setattr(
stage_engine, "QG_CHECKS",
{**stage_engine.QG_CHECKS, "check_analysis_complete": _pass},
)
task_id = _make_task("analysis")
res = advance_stage(task_id, "analysis", "enduro-trails", "ET-001",
"feature/ET-001-x", finished_agent="analyst")
assert res.advanced is False
assert _stage(task_id) == "analysis"
assert stage_engine.set_issue_in_review.called
assert stage_engine.notify_approve_requested.called
assert _jobs() == []
# ---------------------------------------------------------------------------
# launcher + plane both delegate to the engine
# ---------------------------------------------------------------------------
class TestDelegation:
def test_launcher_calls_engine(self):
from src.agents.launcher import AgentLauncher
task_id = _make_task("development", branch="feature/ET-777-deleg")
with patch("src.stage_engine.advance_stage") as m:
AgentLauncher()._try_advance_stage(
run_id=1, agent="developer", repo="enduro-trails",
branch="feature/ET-777-deleg",
)
m.assert_called_once()
kwargs = m.call_args.kwargs
assert kwargs["task_id"] == task_id
assert kwargs["current_stage"] == "development"
assert kwargs["finished_agent"] == "developer"
def test_plane_calls_engine(self):
import asyncio
from src.webhooks import plane as plane_mod
with patch("src.stage_engine.advance_stage") as m:
asyncio.run(
plane_mod._try_advance_stage(
task_id=5, current_stage="analysis", repo="enduro-trails",
work_item_id="ET-001", branch="feature/ET-001-x",
)
)
m.assert_called_once()
# plane passes positional args; finished_agent (last positional) is None.
args = m.call_args.args
assert args[0] == 5
assert args[1] == "analysis"
assert args[-1] is None

View File

@@ -0,0 +1,94 @@
"""Feature 3: stage visibility on the Plane board.
* PLANE_STATES carries the 6 new per-stage / verdict UUIDs.
* STAGE_TO_STATE maps architecture/development/review/testing to their
dedicated board statuses (not all -> In Progress anymore).
* set_issue_stage_state(work_item_id, stage) PATCHes the correct state UUID
for a visible stage, and is a no-op for stages without one (analysis/deploy).
* Needs Input / In Review / Blocked remain higher priority: their explicit
setters use their own state, never overwritten by the stage map.
httpx is mocked; no network.
"""
import os
os.environ.setdefault("ORCH_PLANE_API_TOKEN", "test-token")
os.environ.setdefault("ORCH_GITEA_TOKEN", "test-token")
from unittest.mock import patch, MagicMock # noqa: E402
from src import plane_sync as PS # noqa: E402
EXPECTED_UUIDS = {
"architecture": "3020bbb7-6122-4663-930c-0315ba8dfa3d",
"development": "9920609b-f140-4e46-ab95-89acda8412c8",
"review": "ba0d802c-5218-41d4-ab43-978b0ea123ed",
"testing": "7855d807-b1bf-42ef-8dae-6cde0df92d02",
"approved": "a519a341-dada-4a91-8910-7604f82b79c5",
"rejected": "ba958f3c-5db5-461d-8f82-89425e413b97",
}
def test_plane_states_has_new_uuids():
for key, uuid in EXPECTED_UUIDS.items():
assert PS.PLANE_STATES[key] == uuid
def test_stage_to_state_maps_visible_stages():
assert PS.STAGE_TO_STATE["architecture"] == EXPECTED_UUIDS["architecture"]
assert PS.STAGE_TO_STATE["development"] == EXPECTED_UUIDS["development"]
assert PS.STAGE_TO_STATE["review"] == EXPECTED_UUIDS["review"]
assert PS.STAGE_TO_STATE["testing"] == EXPECTED_UUIDS["testing"]
# analysis / deploy stay on In Progress; done stays Done.
assert PS.STAGE_TO_STATE["analysis"] == PS.PLANE_STATES["in_progress"]
assert PS.STAGE_TO_STATE["deploy"] == PS.PLANE_STATES["in_progress"]
assert PS.STAGE_TO_STATE["done"] == PS.PLANE_STATES["done"]
def _patch_resolution(monkey_targets):
"""Helper: patch find_issue_id + _resolve_project_id to skip the DB/network."""
return monkey_targets
@patch("src.plane_sync.httpx.patch")
@patch("src.plane_sync.find_issue_id", return_value="issue-uuid")
@patch("src.plane_sync._resolve_project_id", return_value="proj-1")
def test_set_issue_stage_state_patches_correct_uuid(mock_proj, mock_find, mock_patch):
resp = MagicMock(); resp.raise_for_status.return_value = None
mock_patch.return_value = resp
PS.set_issue_stage_state("ET-1", "development")
# the PATCH carried the development state UUID
_, kwargs = mock_patch.call_args
assert kwargs["json"]["state"] == EXPECTED_UUIDS["development"]
@patch("src.plane_sync.httpx.patch")
@patch("src.plane_sync.find_issue_id", return_value="issue-uuid")
@patch("src.plane_sync._resolve_project_id", return_value="proj-1")
def test_set_issue_stage_state_noop_for_analysis(mock_proj, mock_find, mock_patch):
# analysis has no dedicated board status -> no PATCH at all.
PS.set_issue_stage_state("ET-1", "analysis")
mock_patch.assert_not_called()
PS.set_issue_stage_state("ET-1", "deploy")
mock_patch.assert_not_called()
@patch("src.plane_sync.httpx.patch")
@patch("src.plane_sync.find_issue_id", return_value="issue-uuid")
@patch("src.plane_sync._resolve_project_id", return_value="proj-1")
def test_priority_states_use_their_own_uuid(mock_proj, mock_find, mock_patch):
"""Needs Input / In Review / Blocked are set explicitly and take priority."""
resp = MagicMock(); resp.raise_for_status.return_value = None
mock_patch.return_value = resp
PS.set_issue_needs_input("ET-1")
assert mock_patch.call_args.kwargs["json"]["state"] == PS.PLANE_STATES["needs_input"]
PS.set_issue_in_review("ET-1")
assert mock_patch.call_args.kwargs["json"]["state"] == PS.PLANE_STATES["in_review"]
PS.set_issue_blocked("ET-1")
assert mock_patch.call_args.kwargs["json"]["state"] == PS.PLANE_STATES["blocked"]

View File

@@ -0,0 +1,150 @@
"""Feature 1: pipeline starts on status -> In Progress, not on creation.
* work_item.created / issue created -> NO task, NO branch, NO analyst.
* issue updated -> In Progress (from backlog) -> task created + analyst enqueued.
* a second In Progress update for the same issue -> NO duplicate, NO restart
(protects handle_comment, which also flips issues to In Progress).
launcher / Gitea network are mocked. Real FastAPI endpoint via TestClient.
"""
import os
import tempfile
_test_db = os.path.join(tempfile.gettempdir(), "test_orchestrator_status_trigger.db")
os.environ["ORCH_DB_PATH"] = _test_db
os.environ.setdefault("ORCH_PLANE_WEBHOOK_SECRET", "")
os.environ.setdefault("ORCH_GITEA_TOKEN", "test-token")
os.environ.setdefault("ORCH_PLANE_API_TOKEN", "test-token")
import pytest # noqa: E402
from unittest.mock import patch, AsyncMock # noqa: E402
from fastapi.testclient import TestClient # noqa: E402
from src.main import app # noqa: E402
from src.db import init_db, get_db # noqa: E402
from src import projects as P # noqa: E402
from src.projects import reload_projects # noqa: E402
ENDURO_PLANE_ID = "7a79f0a9-5278-49cd-9007-9a338f238f9c"
IN_PROGRESS = "b873d9eb-993c-48cd-97ac-99a9b1623967"
BACKLOG = "113b24f6-cce8-4be9-9a22-a359b9cf0122"
client = TestClient(app)
@pytest.fixture(autouse=True)
def setup(monkeypatch):
monkeypatch.setattr(P.settings, "db_path", _test_db)
import src.db as _db
monkeypatch.setattr(_db.settings, "db_path", _test_db)
if os.path.exists(_test_db):
os.unlink(_test_db)
init_db()
monkeypatch.setattr("src.webhooks.plane.verify_plane_signature", lambda body, sig: True)
registry_json = (
f'[{{"plane_project_id": "{ENDURO_PLANE_ID}", "repo": "enduro-trails",'
f' "work_item_prefix": "ET", "name": "enduro-trails"}}]'
)
monkeypatch.setattr(P.settings, "projects_json", registry_json)
reload_projects()
yield
reload_projects()
if os.path.exists(_test_db):
os.unlink(_test_db)
def _created(plane_id="st-created"):
return client.post("/webhook/plane", json={
"event": "issue", "action": "created",
"data": {
"id": plane_id, "name": "A valid backlog item title",
"description_stripped": "A sufficiently long description for QG-0.",
"project": ENDURO_PLANE_ID,
"state": {"id": BACKLOG, "name": "Backlog", "group": "backlog"},
},
})
def _to_in_progress(plane_id="st-1"):
return client.post("/webhook/plane", json={
"event": "issue", "action": "updated",
"data": {
"id": plane_id, "name": "A valid backlog item title",
"description_stripped": "A sufficiently long description for QG-0.",
"project": ENDURO_PLANE_ID,
"state": {"id": IN_PROGRESS, "name": "In Progress", "group": "started"},
},
"activity": {"field": "state", "new_value": IN_PROGRESS, "old_value": BACKLOG},
})
def _count(plane_id):
conn = get_db()
n = conn.execute("SELECT COUNT(*) FROM tasks WHERE plane_id=?", (plane_id,)).fetchone()[0]
conn.close()
return n
# --------------------------------------------------------------------------- #
@patch("src.webhooks.plane.enqueue_job")
@patch("src.webhooks.plane._create_initial_docs", new_callable=AsyncMock)
@patch("src.webhooks.plane._create_gitea_branch", new_callable=AsyncMock)
def test_created_does_not_start_pipeline(mock_branch, mock_docs, mock_enqueue):
resp = _created("st-created")
assert resp.status_code == 200
assert resp.json()["status"] == "accepted"
# No task, no branch, no analyst enqueue.
assert _count("st-created") == 0
mock_branch.assert_not_called()
mock_enqueue.assert_not_called()
@patch("src.webhooks.plane.enqueue_job")
@patch("src.webhooks.plane._create_initial_docs", new_callable=AsyncMock)
@patch("src.webhooks.plane._create_gitea_branch", new_callable=AsyncMock)
@patch("src.plane_sync.fetch_issue_sequence_id", return_value=5)
def test_in_progress_starts_pipeline(mock_seq, mock_branch, mock_docs, mock_enqueue):
mock_enqueue.return_value = 1
resp = _to_in_progress("st-1")
assert resp.status_code == 200
assert resp.json()["status"] == "accepted"
assert _count("st-1") == 1
conn = get_db()
task = conn.execute("SELECT * FROM tasks WHERE plane_id='st-1'").fetchone()
conn.close()
assert task["stage"] == "analysis"
assert task["repo"] == "enduro-trails"
mock_branch.assert_called_once()
# analyst enqueued exactly once
assert mock_enqueue.call_count == 1
assert mock_enqueue.call_args.args[0] == "analyst"
@patch("src.webhooks.plane.enqueue_job")
@patch("src.webhooks.plane._create_initial_docs", new_callable=AsyncMock)
@patch("src.webhooks.plane._create_gitea_branch", new_callable=AsyncMock)
@patch("src.plane_sync.fetch_issue_sequence_id", return_value=5)
def test_repeat_in_progress_is_idempotent(mock_seq, mock_branch, mock_docs, mock_enqueue):
mock_enqueue.return_value = 1
_to_in_progress("st-2")
assert _count("st-2") == 1
assert mock_enqueue.call_count == 1
# Second In Progress update (e.g. handle_comment re-set the status). Use a
# DISTINCT body (different activity old_value) so webhook dedup does NOT
# short-circuit it — this exercises the existing-task idempotency guard in
# handle_status_start, not the delivery-dedup layer.
resp = client.post("/webhook/plane", json={
"event": "issue", "action": "updated",
"data": {
"id": "st-2", "name": "A valid backlog item title",
"description_stripped": "A sufficiently long description for QG-0.",
"project": ENDURO_PLANE_ID,
"state": {"id": IN_PROGRESS, "name": "In Progress", "group": "started"},
},
"activity": {"field": "state", "new_value": IN_PROGRESS, "old_value": "some-other-state"},
})
assert resp.status_code == 200
assert _count("st-2") == 1 # still exactly one task
assert mock_enqueue.call_count == 1 # analyst NOT re-enqueued

176
tests/test_usage.py Normal file
View File

@@ -0,0 +1,176 @@
"""Feature 4: token / cost accounting tests.
Covers:
* parse_usage_from_text on a REAL claude --output-format json result blob
(captured live from CLI 2.1.142), including a leading text line.
* parse on garbage / missing JSON -> None (never raises).
* record_usage writes the columns; NULLs when usage is None.
* fmt_tokens / fmt_cost formatting.
* usage_comment string format.
* task_usage_summary / task_summary_comment aggregate over agent_runs.
DB is an isolated temp file; no network or subprocess.
"""
import os
import tempfile
os.environ.setdefault("ORCH_PLANE_API_TOKEN", "test-token")
os.environ.setdefault("ORCH_GITEA_TOKEN", "test-token")
_test_db = os.path.join(tempfile.gettempdir(), "test_orchestrator_usage.db")
os.environ["ORCH_DB_PATH"] = _test_db
import pytest # noqa: E402
from src import db as db_module # noqa: E402
from src.db import init_db, get_db # noqa: E402
from src import usage as U # noqa: E402
# Real claude --output-format json result object (captured from CLI 2.1.142).
REAL_RESULT_JSON = (
'{"type":"result","subtype":"success","is_error":false,"duration_ms":1795,'
'"num_turns":1,"result":"Hi!","session_id":"abc",'
'"total_cost_usd":0.0560175,'
'"usage":{"input_tokens":45231,"cache_creation_input_tokens":7418,'
'"cache_read_input_tokens":18500,"output_tokens":12100,'
'"service_tier":"standard"},'
'"modelUsage":{"claude-opus-4-7":{"inputTokens":6,"outputTokens":7}},'
'"permission_denials":[]}'
)
@pytest.fixture(autouse=True)
def setup_db(monkeypatch):
# get_db() reads settings.db_path live; pin it to our isolated DB.
monkeypatch.setattr(db_module.settings, "db_path", _test_db, raising=False)
if os.path.exists(_test_db):
os.unlink(_test_db)
init_db()
yield
if os.path.exists(_test_db):
os.unlink(_test_db)
# --------------------------------------------------------------------------- #
# parsing
# --------------------------------------------------------------------------- #
def test_parse_real_result_json():
u = U.parse_usage_from_text(REAL_RESULT_JSON)
assert u is not None
assert u["input_tokens"] == 45231
assert u["output_tokens"] == 12100
assert u["cache_read_tokens"] == 18500
assert abs(u["cost_usd"] - 0.0560175) < 1e-9
def test_parse_with_leading_text():
"""The agent may print text before the trailing JSON; we still find it."""
text = "some agent stdout line\nanother line\n" + REAL_RESULT_JSON
u = U.parse_usage_from_text(text)
assert u is not None
assert u["input_tokens"] == 45231
assert u["output_tokens"] == 12100
def test_parse_garbage_returns_none():
assert U.parse_usage_from_text("not json at all { broken") is None
assert U.parse_usage_from_text("") is None
assert U.parse_usage_from_text(None) is None
def test_parse_json_without_usage_returns_none():
assert U.parse_usage_from_text('{"hello":"world"}') is None
def test_parse_from_log_missing_file_returns_none():
assert U.parse_usage_from_log("/no/such/file.log") is None
# --------------------------------------------------------------------------- #
# record_usage
# --------------------------------------------------------------------------- #
def _new_run(agent="developer", task_id=1):
conn = get_db()
cur = conn.execute("INSERT INTO agent_runs (task_id, agent) VALUES (?, ?)", (task_id, agent))
rid = cur.lastrowid
conn.commit()
conn.close()
return rid
def test_record_usage_writes_columns():
rid = _new_run()
u = U.parse_usage_from_text(REAL_RESULT_JSON)
U.record_usage(rid, u)
conn = get_db()
row = conn.execute(
"SELECT input_tokens, output_tokens, cache_read_tokens, cost_usd "
"FROM agent_runs WHERE id=?", (rid,)
).fetchone()
conn.close()
assert row["input_tokens"] == 45231
assert row["output_tokens"] == 12100
assert row["cache_read_tokens"] == 18500
assert abs(row["cost_usd"] - 0.0560175) < 1e-9
def test_record_usage_none_writes_nulls():
rid = _new_run()
U.record_usage(rid, None) # must not raise
conn = get_db()
row = conn.execute("SELECT input_tokens, cost_usd FROM agent_runs WHERE id=?", (rid,)).fetchone()
conn.close()
assert row["input_tokens"] is None
assert row["cost_usd"] is None
# --------------------------------------------------------------------------- #
# formatting
# --------------------------------------------------------------------------- #
def test_fmt_tokens():
assert U.fmt_tokens(6) == "6"
assert U.fmt_tokens(1234) == "1.2k"
assert U.fmt_tokens(45231) == "45.2k"
assert U.fmt_tokens(2_500_000) == "2.5M"
assert U.fmt_tokens(None) == "0"
def test_fmt_cost():
assert U.fmt_cost(0.21) == "$0.21"
assert U.fmt_cost(0.0560175) == "$0.06"
assert U.fmt_cost(None) == "$0.00"
def test_usage_comment_format():
u = {"input_tokens": 45231, "output_tokens": 12100, "cost_usd": 0.21}
c = U.usage_comment("developer", u)
assert "Developer" in c
assert "45.2k in" in c
assert "12.1k out" in c
assert "$0.21" in c
# --------------------------------------------------------------------------- #
# task summary
# --------------------------------------------------------------------------- #
def test_task_summary_aggregates_over_agents():
# two runs for the same task: developer + tester
for agent, ti, to, cost in [("developer", 1000, 200, 0.10), ("tester", 500, 100, 0.05)]:
rid = _new_run(agent=agent, task_id=42)
U.record_usage(rid, {"input_tokens": ti, "output_tokens": to,
"cache_read_tokens": 0, "cost_usd": cost})
s = U.task_usage_summary(42)
assert s["total_in"] == 1500
assert s["total_out"] == 300
assert abs(s["total_cost"] - 0.15) < 1e-9
agents = {a for a, *_ in s["per_agent"]}
assert agents == {"developer", "tester"}
comment = U.task_summary_comment(42)
assert "1.5k" in comment # total in
assert "$0.15" in comment # total cost
assert "Developer" in comment
assert "Tester" in comment

View File

@@ -0,0 +1,140 @@
"""Feature 2 (variant B): verdict statuses Approved / Rejected.
* issue updated -> Approved : calls _try_advance_stage (== :approved: comment).
* issue updated -> Rejected : calls _rollback_stage (== :rejected: comment).
* the :approved: / :rejected: COMMENT mechanisms still work (both paths live).
We mock the shared engine entry points (_try_advance_stage / _rollback_stage)
and assert they fire for both the status and the comment trigger, so the two
mechanisms are proven to funnel into the same logic.
"""
import os
import tempfile
_test_db = os.path.join(tempfile.gettempdir(), "test_orchestrator_verdict.db")
os.environ["ORCH_DB_PATH"] = _test_db
os.environ.setdefault("ORCH_PLANE_WEBHOOK_SECRET", "")
os.environ.setdefault("ORCH_GITEA_TOKEN", "test-token")
os.environ.setdefault("ORCH_PLANE_API_TOKEN", "test-token")
import pytest # noqa: E402
from unittest.mock import patch, AsyncMock # noqa: E402
from fastapi.testclient import TestClient # noqa: E402
from src.main import app # noqa: E402
from src.db import init_db, get_db # noqa: E402
from src import projects as P # noqa: E402
from src.projects import reload_projects # noqa: E402
ENDURO_PLANE_ID = "7a79f0a9-5278-49cd-9007-9a338f238f9c"
APPROVED = "a519a341-dada-4a91-8910-7604f82b79c5"
REJECTED = "ba958f3c-5db5-461d-8f82-89425e413b97"
client = TestClient(app)
@pytest.fixture(autouse=True)
def setup(monkeypatch):
monkeypatch.setattr(P.settings, "db_path", _test_db)
import src.db as _db
monkeypatch.setattr(_db.settings, "db_path", _test_db)
if os.path.exists(_test_db):
os.unlink(_test_db)
init_db()
monkeypatch.setattr("src.webhooks.plane.verify_plane_signature", lambda body, sig: True)
registry_json = (
f'[{{"plane_project_id": "{ENDURO_PLANE_ID}", "repo": "enduro-trails",'
f' "work_item_prefix": "ET", "name": "enduro-trails"}}]'
)
monkeypatch.setattr(P.settings, "projects_json", registry_json)
reload_projects()
# Seed a task at the 'review' stage for plane_id 'v-1'.
conn = get_db()
conn.execute(
"INSERT INTO tasks (plane_id, work_item_id, repo, branch, stage, plane_issue_id) "
"VALUES (?, ?, ?, ?, ?, ?)",
("v-1", "ET-500", "enduro-trails", "feature/ET-500-x", "review", "v-1"),
)
conn.commit()
conn.close()
yield
reload_projects()
if os.path.exists(_test_db):
os.unlink(_test_db)
def _status(state_id, plane_id="v-1", old="prev"):
return client.post("/webhook/plane", json={
"event": "issue", "action": "updated",
"data": {
"id": plane_id, "name": "Verdict task", "project": ENDURO_PLANE_ID,
"state": {"id": state_id, "name": "X", "group": "started"},
},
"activity": {"field": "state", "new_value": state_id, "old_value": old},
})
def _comment(text, plane_id="v-1"):
return client.post("/webhook/plane", json={
"event": "issue_comment", "action": "created",
"data": {"work_item_id": plane_id, "comment_stripped": text,
"project": ENDURO_PLANE_ID},
})
# --------------------------------------------------------------------------- #
# Approved status -> advance
# --------------------------------------------------------------------------- #
@patch("src.plane_sync.set_issue_in_progress")
@patch("src.webhooks.plane._try_advance_stage", new_callable=AsyncMock)
def test_approved_status_advances(mock_advance, mock_sip):
resp = _status(APPROVED)
assert resp.status_code == 200
mock_advance.assert_awaited_once()
# advanced the right task (ET-500 at review)
args = mock_advance.call_args.args
assert "ET-500" in args # work_item_id is passed positionally
@patch("src.plane_sync.set_issue_in_progress")
@patch("src.webhooks.plane._try_advance_stage", new_callable=AsyncMock)
def test_approved_comment_still_advances(mock_advance, mock_sip):
resp = _comment(":approved:")
assert resp.status_code == 200
mock_advance.assert_awaited_once()
# --------------------------------------------------------------------------- #
# Rejected status -> rollback
# --------------------------------------------------------------------------- #
@patch("src.webhooks.plane._rollback_stage", new_callable=AsyncMock)
def test_rejected_status_rolls_back(mock_rollback):
resp = _status(REJECTED)
assert resp.status_code == 200
mock_rollback.assert_awaited_once()
# reason note for a status reject (no inline reason available)
kwargs_reason = mock_rollback.call_args.args[-1]
assert "rejected via status" in kwargs_reason
@patch("src.webhooks.plane._rollback_stage", new_callable=AsyncMock)
def test_rejected_comment_still_rolls_back(mock_rollback):
resp = _comment(":rejected: bad ADR")
assert resp.status_code == 200
mock_rollback.assert_awaited_once()
reason = mock_rollback.call_args.args[-1]
assert "bad ADR" in reason
# --------------------------------------------------------------------------- #
# Unknown verdict status -> no-op
# --------------------------------------------------------------------------- #
@patch("src.webhooks.plane._rollback_stage", new_callable=AsyncMock)
@patch("src.webhooks.plane._try_advance_stage", new_callable=AsyncMock)
def test_other_status_no_verdict_action(mock_advance, mock_rollback):
# In Review status is not a verdict -> neither advance nor rollback.
resp = _status("38fb1f64-aa1e-48a3-92e0-0b109679046b") # in_review
assert resp.status_code == 200
mock_advance.assert_not_called()
mock_rollback.assert_not_called()

284
tests/test_webhook_dedup.py Normal file
View File

@@ -0,0 +1,284 @@
"""ORCH-5 (M-7): webhook delivery de-duplication tests.
A retried/replayed webhook delivery must be processed exactly once. We mock
enqueue_job (imported into the gitea/plane module namespaces) and assert its
call_count does not grow on a repeat. HMAC is bypassed here by forcing the
webhook secrets empty (the 9 pre-existing 401 webhook tests are a separate
baseline and are NOT touched). A dedicated test keeps the 401-on-bad-signature
guarantee by re-enabling the secret.
"""
import os
import tempfile
from unittest.mock import patch, AsyncMock
import pytest
# Override DB path + project registry BEFORE importing app (same pattern as
# tests/test_webhooks.py).
_test_db = os.path.join(tempfile.gettempdir(), "test_orchestrator_dedup.db")
os.environ["ORCH_DB_PATH"] = _test_db
os.environ["ORCH_REPOS_DIR"] = tempfile.gettempdir()
os.environ["ORCH_GITEA_TOKEN"] = "test-token"
os.environ["ORCH_PLANE_API_TOKEN"] = "test-token"
os.environ["ORCH_GITEA_OWNER"] = "admin"
os.environ["ORCH_DEFAULT_REPO"] = "enduro-trails"
os.environ["ORCH_PROJECTS_JSON"] = (
'[{"plane_project_id": "proj-1", "repo": "enduro-trails", '
'"work_item_prefix": "ET", "name": "enduro-trails"}]'
)
from fastapi.testclient import TestClient # noqa: E402
from src.main import app # noqa: E402
from src.db import init_db, get_db # noqa: E402
from src import db as db_module # noqa: E402
from src.webhooks import gitea as gitea_mod # noqa: E402
from src.webhooks import plane as plane_mod # noqa: E402
from src import projects as projects_mod # noqa: E402
@pytest.fixture(autouse=True)
def setup_db(monkeypatch):
# settings is a process-wide singleton; another test module may have fixed
# settings.db_path to its own file at import time. get_db() reads it live, so
# pin it to OUR db for the duration of each test here.
monkeypatch.setattr(db_module.settings, "db_path", _test_db, raising=False)
if os.path.exists(_test_db):
os.unlink(_test_db)
init_db()
yield
if os.path.exists(_test_db):
os.unlink(_test_db)
@pytest.fixture(autouse=True)
def proj_registry():
"""Pin the shared project registry to proj-1/enduro-trails.
The registry (projects.PROJECTS / _BY_PLANE_ID) is a process-wide singleton
built at import; test_projects.py rebuilds it via reload_projects(), which can
leave it on the built-in default where proj-1 is unknown -> ORCH-6 would
ignore our fixtures. Force ours for each test, then rebuild after.
"""
os.environ["ORCH_PROJECTS_JSON"] = (
'[{"plane_project_id": "proj-1", "repo": "enduro-trails", '
'"work_item_prefix": "ET", "name": "enduro-trails"}]'
)
projects_mod.settings.projects_json = os.environ["ORCH_PROJECTS_JSON"]
projects_mod.reload_projects()
yield
projects_mod.reload_projects()
@pytest.fixture(autouse=True)
def no_hmac(monkeypatch):
"""Bypass HMAC so dedup behavior (not signing) is under test.
settings is shared, so override the secret on the module-level settings that
each verify_* function reads.
"""
monkeypatch.setattr(gitea_mod.settings, "gitea_webhook_secret", "", raising=False)
monkeypatch.setattr(plane_mod.settings, "plane_webhook_secret", "", raising=False)
yield
client = TestClient(app)
def _events_count():
conn = get_db()
n = conn.execute("SELECT COUNT(*) FROM events").fetchone()[0]
conn.close()
return n
# ---------------------------------------------------------------------------
# Migration
# ---------------------------------------------------------------------------
def test_migration_adds_delivery_id_and_index():
"""events has delivery_id + a partial unique index idx_events_delivery."""
conn = get_db()
cols = [r[1] for r in conn.execute("PRAGMA table_info(events)").fetchall()]
idxs = [r[1] for r in conn.execute("PRAGMA index_list(events)").fetchall()]
conn.close()
assert "delivery_id" in cols
assert "idx_events_delivery" in idxs
def test_migration_on_old_db_without_column_does_not_crash():
"""init_db() over a pre-existing events table WITHOUT delivery_id is safe."""
if os.path.exists(_test_db):
os.unlink(_test_db)
import sqlite3
conn = sqlite3.connect(_test_db)
# Old-shape events table (no delivery_id) + a legacy row with NULL delivery_id.
conn.executescript(
"""
CREATE TABLE events (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp TEXT DEFAULT (datetime('now')),
source TEXT NOT NULL,
event_type TEXT NOT NULL,
payload TEXT NOT NULL,
processed INTEGER DEFAULT 0
);
INSERT INTO events (source, event_type, payload) VALUES ('plane','old','{}');
INSERT INTO events (source, event_type, payload) VALUES ('gitea','old2','{}');
"""
)
conn.commit()
conn.close()
# Should add the column + index without raising and keep the legacy rows.
init_db()
conn = get_db()
cols = [r[1] for r in conn.execute("PRAGMA table_info(events)").fetchall()]
n = conn.execute("SELECT COUNT(*) FROM events").fetchone()[0]
conn.close()
assert "delivery_id" in cols
assert n == 2 # legacy NULL-delivery rows preserved, partial index lets them coexist
# ---------------------------------------------------------------------------
# Gitea dedup
# ---------------------------------------------------------------------------
@patch.object(gitea_mod, "enqueue_job")
def test_gitea_duplicate_delivery_id_skips_dispatch(mock_enqueue):
"""Repeated X-Gitea-Delivery -> first processed, second {"status":"duplicate"}."""
# Task at architecture so the ADR push would enqueue.
conn = get_db()
conn.execute(
"INSERT INTO tasks (plane_id, work_item_id, repo, branch, stage) "
"VALUES (?, ?, ?, ?, ?)",
("gd-001", "ET-100", "enduro-trails", "feature/ET-100-x", "architecture"),
)
conn.commit()
conn.close()
body = {
"ref": "refs/heads/feature/ET-100-x",
"repository": {"name": "enduro-trails"},
"commits": [
{"added": ["docs/work-items/ET-100/06-adr/001-d.md"], "modified": []}
],
}
hdrs = {"X-Gitea-Event": "push", "X-Gitea-Delivery": "guid-AAA"}
r1 = client.post("/webhook/gitea", json=body, headers=hdrs)
assert r1.status_code == 200
assert r1.json()["status"] == "accepted"
assert mock_enqueue.call_count == 1
assert _events_count() == 1
# Same delivery id again -> duplicate, no new enqueue, no new event row.
r2 = client.post("/webhook/gitea", json=body, headers=hdrs)
assert r2.status_code == 200
assert r2.json()["status"] == "duplicate"
assert mock_enqueue.call_count == 1
assert _events_count() == 1
@patch.object(gitea_mod, "enqueue_job")
def test_gitea_two_distinct_delivery_ids_both_processed(mock_enqueue):
body = {"ref": "refs/heads/feature/none", "repository": {"name": "enduro-trails"}, "commits": []}
r1 = client.post("/webhook/gitea", json=body,
headers={"X-Gitea-Event": "push", "X-Gitea-Delivery": "guid-1"})
r2 = client.post("/webhook/gitea", json=body,
headers={"X-Gitea-Event": "push", "X-Gitea-Delivery": "guid-2"})
assert r1.json()["status"] == "accepted"
assert r2.json()["status"] == "accepted"
assert _events_count() == 2
def test_gitea_fallback_hash_when_no_delivery_header():
"""No X-Gitea-Delivery -> sha256 fallback; identical body repeat = duplicate."""
body = {"ref": "refs/heads/feature/none", "repository": {"name": "enduro-trails"}, "commits": []}
r1 = client.post("/webhook/gitea", json=body, headers={"X-Gitea-Event": "push"})
r2 = client.post("/webhook/gitea", json=body, headers={"X-Gitea-Event": "push"})
assert r1.json()["status"] == "accepted"
assert r2.json()["status"] == "duplicate"
assert _events_count() == 1
# ---------------------------------------------------------------------------
# Plane dedup
# ---------------------------------------------------------------------------
@patch.object(plane_mod, "enqueue_job")
@patch("src.webhooks.plane._create_gitea_branch", new_callable=AsyncMock)
@patch("src.webhooks.plane._create_initial_docs", new_callable=AsyncMock)
def test_plane_fallback_hash_dedup(mock_docs, mock_branch, mock_enqueue):
"""Repeated identical Plane body -> first accepted+enqueue, repeat duplicate.
Feature 1: the pipeline now starts on a status change to In Progress, not on
creation, so this drives the dedup test with an 'issue updated' event.
"""
IN_PROGRESS = "b873d9eb-993c-48cd-97ac-99a9b1623967"
body = {
"event": "issue",
"action": "updated",
"data": {
"id": "pd-001",
"name": "Dedup plane task",
"description_stripped": "A sufficiently long description for QG-0 to pass.",
"project": "proj-1",
"state": {"id": IN_PROGRESS, "name": "In Progress", "group": "started"},
},
}
r1 = client.post("/webhook/plane", json=body)
assert r1.status_code == 200
assert r1.json()["status"] == "accepted"
assert mock_enqueue.call_count == 1
assert _events_count() == 1
r2 = client.post("/webhook/plane", json=body)
assert r2.status_code == 200
assert r2.json()["status"] == "duplicate"
assert mock_enqueue.call_count == 1 # not re-enqueued
assert _events_count() == 1
@patch("src.webhooks.plane._create_gitea_branch", new_callable=AsyncMock)
@patch("src.webhooks.plane._create_initial_docs", new_callable=AsyncMock)
def test_plane_unknown_project_first_delivery_still_ignored(mock_docs, mock_branch):
"""ORCH-6 intact: first delivery of an unknown project -> {"status":"ignored"}."""
body = {
"event": "work_item.created",
"data": {"id": "unk-001", "name": "Unknown project task", "project": "proj-UNKNOWN"},
}
r1 = client.post("/webhook/plane", json=body)
assert r1.status_code == 200
assert r1.json()["status"] == "ignored"
# Event WAS logged (dedup happens before the project filter), so a retry of the
# SAME body is a duplicate, not re-evaluated.
assert _events_count() == 1
r2 = client.post("/webhook/plane", json=body)
assert r2.json()["status"] == "duplicate"
assert _events_count() == 1
# ---------------------------------------------------------------------------
# HMAC still guarded (acceptance #4) — independent of the dedup path
# ---------------------------------------------------------------------------
def test_gitea_invalid_signature_still_401(monkeypatch):
monkeypatch.setattr(gitea_mod.settings, "gitea_webhook_secret", "s3cr3t", raising=False)
r = client.post(
"/webhook/gitea",
json={"ref": "refs/heads/feature/x", "repository": {"name": "enduro-trails"}, "commits": []},
headers={"X-Gitea-Event": "push", "X-Gitea-Signature": "deadbeef"},
)
assert r.status_code == 401
def test_plane_invalid_signature_still_401(monkeypatch):
monkeypatch.setattr(plane_mod.settings, "plane_webhook_secret", "s3cr3t", raising=False)
r = client.post(
"/webhook/plane",
json={"event": "work_item.created", "data": {"id": "z", "project": "proj-1"}},
headers={"X-Plane-Signature": "deadbeef"},
)
assert r.status_code == 401