Compare commits
31 Commits
feature/OR
...
fix/ci-fai
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
3a285de11d | ||
| 7922f6b67b | |||
|
|
e15d339b14 | ||
| 994f73a78e | |||
|
|
90c9ffe839 | ||
| b6aa107f93 | |||
|
|
0b8013cb06 | ||
| b01643fcc3 | |||
|
|
ca63bc26bb | ||
| dce9ac806b | |||
|
|
a9cdb17614 | ||
|
|
96c5e6b2f9 | ||
|
|
b91be74692 | ||
| 2d392b6fc7 | |||
|
|
857bad314c | ||
|
|
c4be50ee20 | ||
|
|
6b3e144949 | ||
| cd73c75cda | |||
|
|
c69e11348b | ||
|
|
ac9f5a05a6 | ||
|
|
fa746105fd | ||
| 4773137b52 | |||
|
|
7fd6529a35 | ||
|
|
9a702a0216 | ||
|
|
38a741d24e | ||
|
|
09b1c5e1b9 | ||
|
|
a4668c0303 | ||
| e9fd30528f | |||
|
|
d305521067 | ||
|
|
30d6dd0557 | ||
| 12e2691a24 |
@@ -209,9 +209,15 @@ class AgentLauncher:
|
||||
|
||||
# No git fetch/checkout here: ensure_worktree() already put the worktree on
|
||||
# the right branch. The agent simply runs inside its isolated work_path.
|
||||
# Feature 4 (token usage): --output-format json makes claude emit a single
|
||||
# result JSON (with usage + total_cost_usd) at the end of stdout. The log
|
||||
# still captures it; _monitor_agent parses the trailing JSON after the run
|
||||
# to record per-agent tokens/cost. _monitor_agent's failure handling keys
|
||||
# off the process exit_code (not stdout shape), so this is safe.
|
||||
cmd = (
|
||||
f'cd {work_path} && '
|
||||
f'{self.CLAUDE_BIN} --print '
|
||||
f'--output-format json '
|
||||
f'{model_flag}'
|
||||
f'"$(cat {task_file})" '
|
||||
f'--system-prompt "$(cat {system_prompt})" '
|
||||
@@ -400,6 +406,17 @@ class AgentLauncher:
|
||||
|
||||
notify_agent_finished(run_id, agent, exit_code, task_id=_task_id, duration_s=_duration_s)
|
||||
|
||||
# Feature 4: parse token usage / cost from the (json) run log and record
|
||||
# it on the agent_runs row. Never fatal — a garbled/missing JSON records
|
||||
# NULLs and logs a warning so a broken run can't crash the monitor.
|
||||
try:
|
||||
from ..usage import parse_usage_from_log, record_usage
|
||||
_usage = parse_usage_from_log(output_path) if output_path else None
|
||||
record_usage(run_id, _usage)
|
||||
except Exception as e:
|
||||
logger.warning(f"run_id={run_id}: usage accounting failed: {e}")
|
||||
_usage = None
|
||||
|
||||
# Commit and push any changes — in the per-branch worktree (ORCH-2 / S-4),
|
||||
# NOT in the shared /repos/<repo>. The worktree is already on `branch`
|
||||
# (ensure_worktree did the checkout), so no checkout is needed here.
|
||||
@@ -471,7 +488,8 @@ class AgentLauncher:
|
||||
set_issue_blocked(_wid)
|
||||
plane_add_comment(
|
||||
_wid,
|
||||
"\u274c Deploy FAILED (smoke/healthcheck). Rolled back. Developer \u043d\u0443\u0436\u0435\u043d \u0434\u043b\u044f \u0444\u0438\u043a\u0441\u0430."
|
||||
"\u274c Deploy FAILED (smoke/healthcheck). Rolled back. Developer \u043d\u0443\u0436\u0435\u043d \u0434\u043b\u044f \u0444\u0438\u043a\u0441\u0430.",
|
||||
author="deployer",
|
||||
)
|
||||
from ..notifications import send_telegram
|
||||
send_telegram(f"\U0001f6a8 {_wid}: Deploy failed! Rolled back. Needs fix.")
|
||||
@@ -489,6 +507,14 @@ class AgentLauncher:
|
||||
from ..notifications import send_telegram
|
||||
send_telegram(f"\u26a0\ufe0f {_wid}: Agent {agent} failed (exit_code={exit_code}). Check logs: /app/data/runs/{run_id}.log")
|
||||
|
||||
# Feature 4: post the per-agent usage comment under that agent's bot, and
|
||||
# — for the deployer finishing the task — the per-task usage summary.
|
||||
if exit_code == 0:
|
||||
try:
|
||||
self._post_usage_comments(run_id, agent, repo, branch, _usage)
|
||||
except Exception as e:
|
||||
logger.warning(f"run_id={run_id}: usage comment failed: {e}")
|
||||
|
||||
# Auto-advance stage if agent finished successfully and QG passes
|
||||
if exit_code == 0:
|
||||
self._try_advance_stage(run_id, agent, repo, branch)
|
||||
@@ -653,6 +679,32 @@ class AgentLauncher:
|
||||
logger.error(f"Auto-advance failed for run_id={run_id}: {e}")
|
||||
|
||||
|
||||
def _post_usage_comments(self, run_id, agent, repo, branch, usage):
|
||||
"""Feature 4: post the per-agent usage comment (and Deployer summary).
|
||||
|
||||
- Always (on success, with a work_item_id): a per-agent finish comment
|
||||
with token/cost, authored by the finishing agent's Plane bot.
|
||||
- When the deployer finishes: also a per-task summary (SUM over
|
||||
agent_runs GROUP BY agent), authored by the deployer.
|
||||
"""
|
||||
from ..usage import usage_comment, task_summary_comment
|
||||
conn = get_db()
|
||||
row = conn.execute(
|
||||
"SELECT id, work_item_id FROM tasks WHERE repo=? AND branch=?",
|
||||
(repo, branch),
|
||||
).fetchone()
|
||||
conn.close()
|
||||
if not row:
|
||||
return
|
||||
task_id, work_item_id = row[0], row[1]
|
||||
if not work_item_id:
|
||||
return
|
||||
plane_add_comment(work_item_id, usage_comment(agent, usage), author=agent)
|
||||
if agent == "deployer":
|
||||
plane_add_comment(
|
||||
work_item_id, task_summary_comment(task_id), author="deployer"
|
||||
)
|
||||
|
||||
def _ensure_pr(self, repo: str, branch: str, run_id: int):
|
||||
import httpx
|
||||
owner = settings.gitea_owner
|
||||
|
||||
@@ -9,8 +9,20 @@ class Settings(BaseSettings):
|
||||
plane_webhook_secret: str = ""
|
||||
plane_project_id: str = ""
|
||||
|
||||
# Per-agent Plane bot tokens (feat: per-agent comment authorship).
|
||||
# When set, add_comment posts under the matching bot so Plane shows the
|
||||
# real author (Analyst/Architect/...). Empty -> fallback to plane_api_token.
|
||||
plane_bot_analyst: str = ""
|
||||
plane_bot_architect: str = ""
|
||||
plane_bot_developer: str = ""
|
||||
plane_bot_reviewer: str = ""
|
||||
plane_bot_tester: str = ""
|
||||
plane_bot_deployer: str = ""
|
||||
plane_bot_stream: str = ""
|
||||
|
||||
# Gitea
|
||||
gitea_url: str = "http://localhost:3000"
|
||||
gitea_public_url: str = "" # external URL for clickable links in comments; falls back to gitea_url
|
||||
gitea_token: str = ""
|
||||
gitea_webhook_secret: str = ""
|
||||
gitea_owner: str = "admin"
|
||||
|
||||
62
src/db.py
62
src/db.py
@@ -77,6 +77,13 @@ def init_db():
|
||||
"CREATE UNIQUE INDEX IF NOT EXISTS idx_events_delivery "
|
||||
"ON events(delivery_id) WHERE delivery_id IS NOT NULL"
|
||||
)
|
||||
# Feature 4 (token usage): per-run token / cost accounting. Parsed from the
|
||||
# claude --output-format json result by the launcher monitor. Idempotent
|
||||
# ALTERs (no-op once the columns exist) so this is safe on the live prod DB.
|
||||
_ensure_column(conn, "agent_runs", "input_tokens", "INTEGER")
|
||||
_ensure_column(conn, "agent_runs", "output_tokens", "INTEGER")
|
||||
_ensure_column(conn, "agent_runs", "cache_read_tokens", "INTEGER")
|
||||
_ensure_column(conn, "agent_runs", "cost_usd", "REAL")
|
||||
conn.commit()
|
||||
conn.close()
|
||||
|
||||
@@ -152,6 +159,44 @@ def get_next_work_item_id(repo: str, prefix: str = "ET") -> str:
|
||||
return f"{prefix}-{next_num:03d}"
|
||||
|
||||
|
||||
def ensure_unique_work_item_id(work_item_id: str, repo: str) -> str:
|
||||
"""BUG 2a: guarantee work_item_id uniqueness within (repo) over M-6 derive.
|
||||
|
||||
M-6 derives the work_item_id from the Plane sequence_id. That number can
|
||||
collide (e.g. an issue was deleted and the sequence reused, or two issues
|
||||
map to the same number) -> the SAME ET-NNN gets handed to two different
|
||||
tasks, which then physically share a branch/worktree slug prefix and step on
|
||||
each other (see ET-006: task 8 and task 25).
|
||||
|
||||
This is a guard LAYERED ON TOP of the M-6 derive (it does NOT replace it):
|
||||
given the derived id, if that exact <PREFIX>-NNN already exists in the tasks
|
||||
table for this repo, walk forward (ET-007, ET-008, ...) until a free number
|
||||
is found and return that instead. If the derived id is free, it is returned
|
||||
unchanged.
|
||||
"""
|
||||
if not work_item_id or "-" not in work_item_id:
|
||||
return work_item_id
|
||||
prefix, num_str = work_item_id.rsplit("-", 1)
|
||||
try:
|
||||
num = int(num_str)
|
||||
except ValueError:
|
||||
return work_item_id
|
||||
width = len(num_str)
|
||||
|
||||
conn = get_db()
|
||||
try:
|
||||
candidate = work_item_id
|
||||
while conn.execute(
|
||||
"SELECT 1 FROM tasks WHERE repo = ? AND work_item_id = ? LIMIT 1",
|
||||
(repo, candidate),
|
||||
).fetchone() is not None:
|
||||
num += 1
|
||||
candidate = f"{prefix}-{num:0{width}d}"
|
||||
return candidate
|
||||
finally:
|
||||
conn.close()
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# ORCH-5 (M-7): idempotent webhook event logging
|
||||
# ---------------------------------------------------------------------------
|
||||
@@ -306,6 +351,23 @@ def mark_job(
|
||||
conn.close()
|
||||
|
||||
|
||||
def has_active_job_for_task(task_id: int) -> bool:
|
||||
"""True if the task already has a queued or running job.
|
||||
|
||||
Used by the status-only verdict model (handle_status_start) to guard against
|
||||
double-launching an agent when a duplicate In Progress webhook arrives or a
|
||||
job is still in flight. The events de-dup absorbs identical webhook bodies;
|
||||
this guards against distinct webhooks while a job is pending/running.
|
||||
"""
|
||||
conn = get_db()
|
||||
row = conn.execute(
|
||||
"SELECT 1 FROM jobs WHERE task_id = ? AND status IN ('queued','running') LIMIT 1",
|
||||
(task_id,),
|
||||
).fetchone()
|
||||
conn.close()
|
||||
return row is not None
|
||||
|
||||
|
||||
def count_running_jobs() -> int:
|
||||
"""Number of jobs currently in 'running' status (for max_concurrency)."""
|
||||
conn = get_db()
|
||||
|
||||
@@ -15,6 +15,44 @@ EMOJI_DONE = "\u2705" # task completed
|
||||
PLANE_BASE = f"{settings.plane_api_url}/api/v1"
|
||||
PLANE_HEADERS = {"X-API-Key": settings.plane_api_token}
|
||||
WORKSPACE = settings.plane_workspace_slug
|
||||
|
||||
# feat(plane): per-agent comment authorship.
|
||||
# Map an agent role -> its dedicated Plane bot token (read from config / env).
|
||||
# When the token is present, add_comment() POSTs under that bot so Plane shows
|
||||
# the real author. Empty/unknown role -> fallback to the shared orchestrator
|
||||
# token (PLANE_HEADERS), so commenting stays autonomous.
|
||||
PLANE_BOT_TOKENS = {
|
||||
"analyst": settings.plane_bot_analyst,
|
||||
"architect": settings.plane_bot_architect,
|
||||
"developer": settings.plane_bot_developer,
|
||||
"reviewer": settings.plane_bot_reviewer,
|
||||
"tester": settings.plane_bot_tester,
|
||||
"deployer": settings.plane_bot_deployer,
|
||||
"stream": settings.plane_bot_stream,
|
||||
}
|
||||
|
||||
# Map a pipeline stage -> the agent role that owns work in that stage. Used to
|
||||
# pick an author for rollback/stage notifications targeting a specific stage.
|
||||
STAGE_AUTHORS = {
|
||||
"analysis": "analyst",
|
||||
"architecture": "architect",
|
||||
"development": "developer",
|
||||
"review": "reviewer",
|
||||
"testing": "tester",
|
||||
"deploy": "deployer",
|
||||
}
|
||||
|
||||
|
||||
def _headers_for(author: str | None) -> dict:
|
||||
"""Return X-API-Key headers for the given agent role.
|
||||
|
||||
Falls back to the shared orchestrator token (PLANE_HEADERS /
|
||||
settings.plane_api_token) when the role is None, unknown, or its bot token
|
||||
is not configured. This keeps comment posting autonomous: a comment is
|
||||
always written, just attributed to the orchestrator if no bot is set.
|
||||
"""
|
||||
tok = PLANE_BOT_TOKENS.get(author or "") if author else None
|
||||
return {"X-API-Key": tok} if tok else PLANE_HEADERS
|
||||
PROJECT_ID = settings.plane_project_id or "7a79f0a9-5278-49cd-9007-9a338f238f9c"
|
||||
|
||||
|
||||
@@ -46,7 +84,12 @@ def _resolve_project_id(work_item_id: str = None, project_id: str = None) -> str
|
||||
logger.debug(f"_resolve_project_id fallback for {work_item_id}: {e}")
|
||||
return PROJECT_ID
|
||||
|
||||
# Plane state IDs
|
||||
# Plane state IDs.
|
||||
# TODO(ORCH-10): these UUIDs are PER-PROJECT. The 6 stage-visibility / verdict
|
||||
# statuses below were created only in the enduro project (7a79f0a9-...). One
|
||||
# project is in prod today, so a single global dict is acceptable. When more
|
||||
# projects are onboarded these must be resolved per project (see ORCH-10 in
|
||||
# BACKLOG.md / the ORCH-6 project registry) — do NOT hardcode globally then.
|
||||
PLANE_STATES = {
|
||||
"backlog": "113b24f6-cce8-4be9-9a22-a359b9cf0122",
|
||||
"todo": "2c7d3df3-9eb9-419b-92b7-d7d560bcdd10",
|
||||
@@ -56,16 +99,39 @@ PLANE_STATES = {
|
||||
"blocked": "6c4543f9-ac47-4ef7-ae0f-070020dc9920",
|
||||
"done": "381a2833-3c4e-4be5-bd0f-be84cb946ad8",
|
||||
"cancelled": "b1cae7f9-961d-4889-a179-f3acea697d17",
|
||||
# Feature 3 (stage visibility) — per-stage statuses on the board.
|
||||
"architecture": "3020bbb7-6122-4663-930c-0315ba8dfa3d",
|
||||
"development": "9920609b-f140-4e46-ab95-89acda8412c8",
|
||||
"review": "ba0d802c-5218-41d4-ab43-978b0ea123ed",
|
||||
"testing": "7855d807-b1bf-42ef-8dae-6cde0df92d02",
|
||||
# Feature 2 (verdict statuses) — Approved / Rejected.
|
||||
"approved": "a519a341-dada-4a91-8910-7604f82b79c5",
|
||||
"rejected": "ba958f3c-5db5-461d-8f82-89425e413b97",
|
||||
}
|
||||
|
||||
# Map orchestrator stages to Plane states
|
||||
# Feature 3: map an orchestrator stage -> the Plane status to show on the board
|
||||
# when the pipeline ENTERS that stage. analysis stays driven by the existing
|
||||
# in_progress/in_review/needs_input logic (no dedicated status). deploy keeps
|
||||
# in_progress until done. Needs Input / In Review / Blocked remain higher
|
||||
# priority and are set explicitly elsewhere — do NOT override them from here.
|
||||
STAGE_VISIBILITY_STATE = {
|
||||
"architecture": "architecture",
|
||||
"development": "development",
|
||||
"review": "review",
|
||||
"testing": "testing",
|
||||
}
|
||||
|
||||
# Map orchestrator stages to Plane states (used by update_issue_state /
|
||||
# notify_stage_change). Feature 3: architecture/development/review/testing now
|
||||
# point at their dedicated board statuses so the task physically moves across
|
||||
# columns. analysis -> in_progress, deploy -> in_progress, done -> done.
|
||||
STAGE_TO_STATE = {
|
||||
"created": PLANE_STATES["todo"],
|
||||
"analysis": PLANE_STATES["in_progress"],
|
||||
"architecture": PLANE_STATES["in_progress"],
|
||||
"development": PLANE_STATES["in_progress"],
|
||||
"review": PLANE_STATES["in_progress"],
|
||||
"testing": PLANE_STATES["in_progress"],
|
||||
"architecture": PLANE_STATES["architecture"],
|
||||
"development": PLANE_STATES["development"],
|
||||
"review": PLANE_STATES["review"],
|
||||
"testing": PLANE_STATES["testing"],
|
||||
"deploy": PLANE_STATES["in_progress"],
|
||||
"done": PLANE_STATES["done"],
|
||||
}
|
||||
@@ -89,6 +155,84 @@ def fetch_issue_sequence_id(issue_id: str, project_id: str) -> int | None:
|
||||
return None
|
||||
|
||||
|
||||
import re as _re
|
||||
|
||||
|
||||
def _strip_html(html: str) -> str:
|
||||
"""Crude HTML -> text: drop tags and collapse whitespace. Good enough to
|
||||
feed QG-0's length check when Plane only gives us description_html."""
|
||||
if not html:
|
||||
return ""
|
||||
text = _re.sub(r"<[^>]+>", " ", html)
|
||||
return _re.sub(r"\s+", " ", text).strip()
|
||||
|
||||
|
||||
def fetch_issue_description(issue_id: str, project_id: str) -> str:
|
||||
"""BUG 1: GET the Plane issue by UUID and return its description text.
|
||||
|
||||
Plane's ``issue.updated`` webhook (e.g. a status change) only carries the
|
||||
CHANGED fields, so ``description``/``description_stripped`` are usually
|
||||
absent there. start_pipeline calls this to pull the full description from the
|
||||
issue detail endpoint so QG-0 does not blow up on an empty payload field.
|
||||
|
||||
Reuses the exact GET issue detail endpoint / shared token already used by
|
||||
``fetch_issue_sequence_id`` (same URL, same PLANE_HEADERS). Prefers
|
||||
``description_stripped``; falls back to stripping ``description_html``.
|
||||
|
||||
Returns "" on network error, non-2xx, or a missing field - never raises, so
|
||||
a Plane outage degrades to the honest "empty description" QG-0 path instead
|
||||
of crashing the webhook.
|
||||
"""
|
||||
url = f"{PLANE_BASE}/workspaces/{WORKSPACE}/projects/{project_id}/issues/{issue_id}/"
|
||||
try:
|
||||
resp = httpx.get(url, headers=PLANE_HEADERS, timeout=10)
|
||||
resp.raise_for_status()
|
||||
body = resp.json()
|
||||
desc = body.get("description_stripped")
|
||||
if desc and desc.strip():
|
||||
return desc
|
||||
return _strip_html(body.get("description_html") or "")
|
||||
except Exception as e:
|
||||
logger.warning(f"fetch_issue_description failed for {issue_id}: {e}")
|
||||
return ""
|
||||
|
||||
|
||||
def fetch_issue_fields(issue_id: str, project_id: str) -> tuple[str, str]:
|
||||
"""BUG B: GET the Plane issue by UUID ONCE and return (name, description).
|
||||
|
||||
Plane's ``issue.updated`` webhook (e.g. a status change) only carries the
|
||||
CHANGED fields, so BOTH ``name`` and ``description`` are usually absent in
|
||||
the payload. start_pipeline needs the real title (for the branch slug) and
|
||||
the real description (for the analyst .task.md). To avoid issuing two
|
||||
separate issue-detail GETs (one for name, one for description), this single
|
||||
request returns both.
|
||||
|
||||
Reuses the exact GET issue detail endpoint / shared token already used by
|
||||
``fetch_issue_sequence_id`` / ``fetch_issue_description``. For the
|
||||
description it applies the same logic as ``fetch_issue_description``
|
||||
(prefer ``description_stripped``, fall back to stripping
|
||||
``description_html``).
|
||||
|
||||
Returns ("", "") on network error, non-2xx, or missing body - never raises,
|
||||
so a Plane outage degrades gracefully (caller keeps its payload fallbacks).
|
||||
"""
|
||||
url = f"{PLANE_BASE}/workspaces/{WORKSPACE}/projects/{project_id}/issues/{issue_id}/"
|
||||
try:
|
||||
resp = httpx.get(url, headers=PLANE_HEADERS, timeout=10)
|
||||
resp.raise_for_status()
|
||||
body = resp.json()
|
||||
name = (body.get("name") or "").strip()
|
||||
desc = body.get("description_stripped")
|
||||
if desc and desc.strip():
|
||||
description = desc
|
||||
else:
|
||||
description = _strip_html(body.get("description_html") or "")
|
||||
return name, description
|
||||
except Exception as e:
|
||||
logger.warning(f"fetch_issue_fields failed for {issue_id}: {e}")
|
||||
return "", ""
|
||||
|
||||
|
||||
def find_issue_id(work_item_id: str, project_id: str = None) -> str | None:
|
||||
"""Find Plane issue UUID by work_item_id (e.g. 'ET-002')."""
|
||||
project_id = _resolve_project_id(work_item_id, project_id)
|
||||
@@ -159,8 +303,14 @@ def update_issue_state(work_item_id: str, stage: str, project_id: str = None):
|
||||
logger.error(f"Failed to update Plane state for {work_item_id}: {e}")
|
||||
|
||||
|
||||
def add_comment(work_item_id: str, text: str, project_id: str = None):
|
||||
"""Add a comment to Plane issue."""
|
||||
def add_comment(work_item_id: str, text: str, project_id: str = None, author: str = None):
|
||||
"""Add a comment to a Plane issue.
|
||||
|
||||
feat(plane): when ``author`` (an agent role) maps to a configured bot
|
||||
token, the comment is POSTed under that bot so Plane shows the real author.
|
||||
Otherwise it falls back to the shared orchestrator token (see
|
||||
``_headers_for``). GET/PATCH calls elsewhere keep using PLANE_HEADERS.
|
||||
"""
|
||||
project_id = _resolve_project_id(work_item_id, project_id)
|
||||
issue_id = find_issue_id(work_item_id, project_id)
|
||||
if not issue_id:
|
||||
@@ -170,9 +320,9 @@ def add_comment(work_item_id: str, text: str, project_id: str = None):
|
||||
url = f"{PLANE_BASE}/workspaces/{WORKSPACE}/projects/{project_id}/issues/{issue_id}/comments/"
|
||||
html = f"<p>{text}</p>"
|
||||
try:
|
||||
resp = httpx.post(url, headers=PLANE_HEADERS, json={"comment_html": html}, timeout=10)
|
||||
resp = httpx.post(url, headers=_headers_for(author), json={"comment_html": html}, timeout=10)
|
||||
resp.raise_for_status()
|
||||
logger.info(f"Plane: comment added to {work_item_id}")
|
||||
logger.info(f"Plane: comment added to {work_item_id} (author={author or 'orchestrator'})")
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to add comment to {work_item_id}: {e}")
|
||||
|
||||
@@ -198,6 +348,21 @@ def set_issue_in_progress(work_item_id: str, project_id: str = None):
|
||||
_set_issue_state_direct(work_item_id, PLANE_STATES["in_progress"], project_id)
|
||||
|
||||
|
||||
def set_issue_stage_state(work_item_id: str, stage: str, project_id: str = None):
|
||||
"""Feature 3: move the issue to the board status for a pipeline stage.
|
||||
|
||||
Only the visible-stage statuses (architecture/development/review/testing)
|
||||
are driven here — stages without a dedicated status (analysis/deploy) are a
|
||||
no-op so the existing in_progress/in_review/needs_input logic stays in
|
||||
charge. By design this does NOT touch Needs Input / In Review / Blocked,
|
||||
which are higher priority and set explicitly by their own helpers.
|
||||
"""
|
||||
state_key = STAGE_VISIBILITY_STATE.get(stage)
|
||||
if not state_key:
|
||||
return
|
||||
_set_issue_state_direct(work_item_id, PLANE_STATES[state_key], project_id)
|
||||
|
||||
|
||||
def _set_issue_state_direct(work_item_id: str, state_id: str, project_id: str = None):
|
||||
"""Set issue state directly by state_id."""
|
||||
project_id = _resolve_project_id(work_item_id, project_id)
|
||||
@@ -252,16 +417,29 @@ def notify_stage_change(work_item_id: str, old_stage: str, new_stage: str, agent
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
add_comment(work_item_id, msg, project_id)
|
||||
# Stage transition is the orchestrator's own voice -> attribute to stream.
|
||||
add_comment(work_item_id, msg, project_id, author="stream")
|
||||
|
||||
|
||||
def notify_qg_failure(work_item_id: str, stage: str, check: str, reason: str, project_id: str = None):
|
||||
"""Notify Plane about QG failure."""
|
||||
add_comment(work_item_id, f"{EMOJI_QG_FAIL} QG failed at {stage}: {check} — {reason}", project_id)
|
||||
# QG failure belongs to the agent that owns the failing stage.
|
||||
add_comment(
|
||||
work_item_id,
|
||||
f"{EMOJI_QG_FAIL} QG failed at {stage}: {check} — {reason}",
|
||||
project_id,
|
||||
author=STAGE_AUTHORS.get(stage, "stream"),
|
||||
)
|
||||
|
||||
|
||||
def notify_done(work_item_id: str, project_id: str = None):
|
||||
"""Mark issue as Done in Plane."""
|
||||
project_id = _resolve_project_id(work_item_id, project_id)
|
||||
update_issue_state(work_item_id, "done", project_id)
|
||||
add_comment(work_item_id, f"{EMOJI_DONE} Task completed! PR merged and deployed.", project_id)
|
||||
# Deploy finished the task -> attribute the completion comment to Deployer.
|
||||
add_comment(
|
||||
work_item_id,
|
||||
f"{EMOJI_DONE} Task completed! PR merged and deployed.",
|
||||
project_id,
|
||||
author="deployer",
|
||||
)
|
||||
|
||||
@@ -249,9 +249,17 @@ def check_reviewer_verdict(repo: str, work_item_id: str, branch: str | None = No
|
||||
|
||||
def check_tests_local(repo: str, branch: str) -> tuple[bool, str]:
|
||||
"""
|
||||
DEPRECATED: replaced by check_ci_green on the development stage (CI is now
|
||||
configured). Kept for backward-compat; not wired to any stage.
|
||||
|
||||
S-1 fix: run the project test suite locally and judge by exit code, instead of
|
||||
depending on Gitea CI (which is not configured -> always false).
|
||||
|
||||
БАГ 5 fix: invoke pytest directly instead of make test. make is not installed
|
||||
in the orchestrator container, so the previous ["make", "test"] call raised
|
||||
FileNotFoundError. This reproduces the Makefile test target 1:1
|
||||
(cd src/api && python -m pytest ../../tests/ -v).
|
||||
|
||||
ORCH-2 / S-4: tests run inside the per-branch worktree (ensure_worktree), so this
|
||||
is safe for concurrent active tasks — no shared /repos checkout race.
|
||||
"""
|
||||
@@ -259,7 +267,8 @@ def check_tests_local(repo: str, branch: str) -> tuple[bool, str]:
|
||||
try:
|
||||
repo_path = ensure_worktree(repo, branch)
|
||||
r = subprocess.run(
|
||||
["make", "test"], cwd=repo_path,
|
||||
["python", "-m", "pytest", "../../tests/", "-v"],
|
||||
cwd=os.path.join(repo_path, "src", "api"),
|
||||
capture_output=True, text=True, timeout=600,
|
||||
)
|
||||
if r.returncode == 0:
|
||||
|
||||
@@ -189,36 +189,48 @@ def advance_stage(
|
||||
|
||||
# --- Quality gate ----------------------------------------------------
|
||||
if qg_name and qg_name in QG_CHECKS:
|
||||
# Human-approval gate: special analyst approved-flow (launcher only).
|
||||
# Human-approval gate: split by path.
|
||||
if qg_name == "check_analysis_approved":
|
||||
_handle_analysis_approved_flow(
|
||||
task_id, current_stage, repo, work_item_id, branch, agent, result
|
||||
)
|
||||
return result
|
||||
# Launcher path (analyst just finished): set In Review + ask for
|
||||
# the Approved status. This gate never advances on its own -- a
|
||||
# human Approved verdict does that.
|
||||
if agent == "analyst":
|
||||
_handle_analysis_approved_flow(
|
||||
task_id, current_stage, repo, work_item_id, branch, agent, result
|
||||
)
|
||||
return result
|
||||
# Webhook Approved-verdict path (agent is None): the human flipped
|
||||
# the Plane status to Approved, which IS the approval. The gate is
|
||||
# satisfied -- do NOT re-run check_analysis_approved (it looks for
|
||||
# an :approved: *comment* and would block on a status-only
|
||||
# approval). Mark it passed and fall through to the Advance block.
|
||||
result.qg_name = qg_name
|
||||
result.qg_passed = True
|
||||
result.qg_reason = "approved-via-status"
|
||||
else:
|
||||
passed, reason = _run_qg(qg_name, repo, work_item_id, branch)
|
||||
result.qg_passed = passed
|
||||
result.qg_reason = reason
|
||||
|
||||
passed, reason = _run_qg(qg_name, repo, work_item_id, branch)
|
||||
result.qg_passed = passed
|
||||
result.qg_reason = reason
|
||||
if not passed:
|
||||
logger.info(
|
||||
f"Task {task_id}: QG '{qg_name}' not passed after {agent}: {reason}"
|
||||
)
|
||||
# Behaviour parity:
|
||||
# - webhook path (finished_agent is None): emit the generic
|
||||
# QG-failure notification, exactly like the old plane handler.
|
||||
# - launcher path (finished_agent set): NO generic notification;
|
||||
# the rollback branches below own their own messaging, exactly
|
||||
# like the old launcher handler.
|
||||
if agent is None:
|
||||
notify_qg_failure(task_id, current_stage, qg_name, reason)
|
||||
plane_notify_qg(work_item_id, current_stage, qg_name, reason)
|
||||
|
||||
if not passed:
|
||||
logger.info(
|
||||
f"Task {task_id}: QG '{qg_name}' not passed after {agent}: {reason}"
|
||||
)
|
||||
# Behaviour parity:
|
||||
# - webhook path (finished_agent is None): emit the generic
|
||||
# QG-failure notification, exactly like the old plane handler.
|
||||
# - launcher path (finished_agent set): NO generic notification;
|
||||
# the rollback branches below own their own messaging, exactly
|
||||
# like the old launcher handler.
|
||||
if agent is None:
|
||||
notify_qg_failure(task_id, current_stage, qg_name, reason)
|
||||
plane_notify_qg(work_item_id, current_stage, qg_name, reason)
|
||||
|
||||
_handle_qg_failure_rollbacks(
|
||||
task_id, current_stage, repo, work_item_id, branch,
|
||||
agent, qg_name, reason, result,
|
||||
)
|
||||
return result
|
||||
_handle_qg_failure_rollbacks(
|
||||
task_id, current_stage, repo, work_item_id, branch,
|
||||
agent, qg_name, reason, result,
|
||||
)
|
||||
return result
|
||||
|
||||
elif qg_name:
|
||||
# QG name set but not registered — do not advance (launcher behavior).
|
||||
@@ -257,6 +269,58 @@ def advance_stage(
|
||||
return result
|
||||
|
||||
|
||||
def _build_analyst_ready_comment(repo: str, work_item_id: str, branch: str) -> str:
|
||||
"""BUG C: HTML comment posted when analyst artifacts are ready.
|
||||
|
||||
Status-only model (PR #12): approval is the **Approved** status, NOT a
|
||||
``:approved:`` comment and NOT moving back to In Progress. The comment asks
|
||||
the stakeholder to flip the status and links the documents the analyst
|
||||
actually produced.
|
||||
|
||||
Links point at the Gitea web view:
|
||||
{gitea_url}/{owner}/{repo}/src/branch/{branch}/docs/work-items/{wid}/<file>
|
||||
Only files that REALLY exist in the worktree are listed (no invented docs).
|
||||
"""
|
||||
text = (
|
||||
"\u2705 BRD/\u0422\u0417/AC \u0433\u043e\u0442\u043e\u0432\u044b. "
|
||||
"\u0414\u043b\u044f \u043f\u0440\u043e\u0434\u0432\u0438\u0436\u0435\u043d\u0438\u044f "
|
||||
"\u043f\u0435\u0440\u0435\u0432\u0435\u0434\u0438\u0442\u0435 \u0437\u0430\u0434\u0430\u0447\u0443 "
|
||||
"\u0432 \u0441\u0442\u0430\u0442\u0443\u0441 Approved. "
|
||||
"\u0414\u043b\u044f \u043e\u0442\u043a\u043b\u043e\u043d\u0435\u043d\u0438\u044f \u2014 "
|
||||
"\u043d\u0430\u043f\u0438\u0448\u0438\u0442\u0435 \u043f\u0440\u0438\u0447\u0438\u043d\u0443 "
|
||||
"\u043a\u043e\u043c\u043c\u0435\u043d\u0442\u043e\u043c \u0438 \u043f\u0435\u0440\u0435\u0432\u0435\u0434\u0438\u0442\u0435 "
|
||||
"\u0432 Rejected."
|
||||
)
|
||||
|
||||
# Candidate analyst artifacts (label -> filename). Only existing ones linked.
|
||||
candidates = [
|
||||
("Business request", "00-business-request.md"),
|
||||
("BRD", "01-brd.md"),
|
||||
("\u0422\u0417 (TRZ)", "02-trz.md"),
|
||||
("Acceptance Criteria", "03-acceptance-criteria.md"),
|
||||
("Test Plan", "04-test-plan.yaml"),
|
||||
("UI Test Cases", "04b-ui-test-cases.md"),
|
||||
]
|
||||
rel_dir = f"docs/work-items/{work_item_id}"
|
||||
try:
|
||||
wt_dir = os.path.join(get_worktree_path(repo, branch), rel_dir)
|
||||
except Exception:
|
||||
wt_dir = None
|
||||
|
||||
owner = getattr(settings, "gitea_owner", "admin")
|
||||
base = (getattr(settings, "gitea_public_url", "") or settings.gitea_url).rstrip("/")
|
||||
links = []
|
||||
for label, fname in candidates:
|
||||
if wt_dir and not os.path.isfile(os.path.join(wt_dir, fname)):
|
||||
continue
|
||||
href = f"{base}/{owner}/{repo}/src/branch/{branch}/{rel_dir}/{fname}"
|
||||
links.append(f'<li><a href="{href}">{label}</a></li>')
|
||||
|
||||
if links:
|
||||
text += "<br><b>\u0414\u043e\u043a\u0443\u043c\u0435\u043d\u0442\u044b:</b><ul>" + "".join(links) + "</ul>"
|
||||
return text
|
||||
|
||||
|
||||
def _handle_analysis_approved_flow(
|
||||
task_id, current_stage, repo, work_item_id, branch, agent, result: AdvanceResult
|
||||
):
|
||||
@@ -279,18 +343,17 @@ def _handle_analysis_approved_flow(
|
||||
|
||||
files_ok, _ = files_check(repo, work_item_id, branch)
|
||||
if files_ok:
|
||||
# Full artifacts ready -> In Review, ask for :approved:.
|
||||
# Full artifacts ready -> In Review, ask for the Approved STATUS (BUG C).
|
||||
set_issue_in_review(work_item_id)
|
||||
plane_add_comment(
|
||||
work_item_id,
|
||||
"\U0001f4cb BRD/\u0422\u0417/AC/TestPlan \u0433\u043e\u0442\u043e\u0432\u044b. "
|
||||
"\u041f\u0440\u043e\u0448\u0443 review \u0438 \u0440\u0435\u0430\u043a\u0446\u0438\u044e :approved: "
|
||||
"\u0434\u043b\u044f \u043f\u0440\u043e\u0434\u0432\u0438\u0436\u0435\u043d\u0438\u044f \u0432 Architecture.",
|
||||
_build_analyst_ready_comment(repo, work_item_id, branch),
|
||||
author="analyst",
|
||||
)
|
||||
notify_approve_requested(task_id)
|
||||
result.note = "analysis-in-review"
|
||||
logger.info(
|
||||
f"Task {task_id}: analyst finished, requested :approved: in Plane"
|
||||
f"Task {task_id}: analyst finished, requested Approved status in Plane"
|
||||
)
|
||||
return
|
||||
|
||||
@@ -305,6 +368,7 @@ def _handle_analysis_approved_flow(
|
||||
plane_add_comment(
|
||||
work_item_id,
|
||||
f"\u2753 Analyst \u043d\u0443\u0436\u0434\u0430\u0435\u0442\u0441\u044f \u0432 \u0443\u0442\u043e\u0447\u043d\u0435\u043d\u0438\u0438:\n\n{questions_text}",
|
||||
author="analyst",
|
||||
)
|
||||
send_telegram(
|
||||
f"\u2753 {work_item_id}: Analyst \u0437\u0430\u0434\u0430\u0451\u0442 \u0432\u043e\u043f\u0440\u043e\u0441\u044b. \u041e\u0442\u0432\u0435\u0442\u044c \u0432 Plane."
|
||||
@@ -316,6 +380,7 @@ def _handle_analysis_approved_flow(
|
||||
plane_add_comment(
|
||||
work_item_id,
|
||||
"\u26a0\ufe0f Analyst \u0437\u0430\u0432\u0435\u0440\u0448\u0438\u043b\u0441\u044f \u0431\u0435\u0437 \u0430\u0440\u0442\u0435\u0444\u0430\u043a\u0442\u043e\u0432 \u0438 \u0431\u0435\u0437 \u0432\u043e\u043f\u0440\u043e\u0441\u043e\u0432. \u041f\u0440\u043e\u0432\u0435\u0440\u044c\u0442\u0435 \u043b\u043e\u0433.",
|
||||
author="analyst",
|
||||
)
|
||||
result.note = "analysis-empty"
|
||||
|
||||
@@ -370,6 +435,7 @@ def _handle_qg_failure_rollbacks(
|
||||
work_item_id,
|
||||
f"\u274c \u0422\u0435\u0441\u0442\u044b \u043d\u0435 \u043f\u0440\u043e\u0448\u043b\u0438: {reason}. "
|
||||
f"Developer \u043f\u0435\u0440\u0435\u0437\u0430\u043f\u0443\u0449\u0435\u043d \u0434\u043b\u044f \u0444\u0438\u043a\u0441\u0430.",
|
||||
author="tester",
|
||||
)
|
||||
retry_count = _developer_retry_count(task_id)
|
||||
if retry_count < MAX_DEVELOPER_RETRIES:
|
||||
@@ -410,6 +476,7 @@ def _handle_qg_failure_rollbacks(
|
||||
work_item_id,
|
||||
f"\u26a0\ufe0f Architect \u043d\u0430\u0448\u0451\u043b \u043a\u043e\u043d\u0444\u043b\u0438\u043a\u0442 \u0441 \u0422\u0417. "
|
||||
f"\u0412\u043e\u0437\u0432\u0440\u0430\u0442 \u0432 Analysis.\n\n{conflict_text}",
|
||||
author="architect",
|
||||
)
|
||||
task_desc = (
|
||||
f"Work item: {work_item_id}\nRepo: {repo}\nBranch: {branch}\n"
|
||||
|
||||
@@ -13,7 +13,7 @@ STAGE_TRANSITIONS = {
|
||||
"created": {"next": "analysis", "agent": "analyst", "qg": None},
|
||||
"analysis": {"next": "architecture", "agent": "architect", "qg": "check_analysis_approved"},
|
||||
"architecture": {"next": "development", "agent": "developer", "qg": "check_architecture_done"},
|
||||
"development": {"next": "review", "agent": "reviewer", "qg": "check_tests_local"},
|
||||
"development": {"next": "review", "agent": "reviewer", "qg": "check_ci_green"},
|
||||
"review": {"next": "testing", "agent": "tester", "qg": "check_reviewer_verdict"},
|
||||
"testing": {"next": "deploy", "agent": "deployer", "qg": "check_tests_passed"},
|
||||
"deploy": {"next": "done", "agent": None, "qg": None},
|
||||
|
||||
268
src/usage.py
Normal file
268
src/usage.py
Normal file
@@ -0,0 +1,268 @@
|
||||
"""Feature 4: token / cost accounting for agent runs.
|
||||
|
||||
claude --output-format json emits a single result JSON object at the end of the
|
||||
run log with fields:
|
||||
total_cost_usd
|
||||
usage.input_tokens / output_tokens / cache_read_input_tokens /
|
||||
cache_creation_input_tokens
|
||||
modelUsage, num_turns, duration_ms
|
||||
|
||||
This module parses that JSON out of a (text-or-json) run log, records the usage
|
||||
on the agent_runs row, formats a Plane comment for the finishing agent, and
|
||||
builds the per-task summary the Deployer posts on deploy/done.
|
||||
|
||||
Everything here is defensive: a missing/garbled JSON never raises \u2014 we record
|
||||
NULL/0 and log a warning so a broken agent run can't crash the monitor.
|
||||
"""
|
||||
|
||||
import json
|
||||
import logging
|
||||
|
||||
from .db import get_db
|
||||
|
||||
logger = logging.getLogger("orchestrator.usage")
|
||||
|
||||
|
||||
def parse_usage_from_text(text: str) -> dict | None:
|
||||
"""Extract the claude result-JSON usage from a run log's text.
|
||||
|
||||
The log may contain plain text before/after the JSON; with
|
||||
--output-format json the JSON is the final object. We scan for the LAST
|
||||
top-level '{' ... '}' that parses and carries usage/total_cost_usd.
|
||||
|
||||
Returns a normalised dict
|
||||
{input_tokens, output_tokens, cache_read_tokens, cost_usd}
|
||||
(ints / float, missing fields -> 0 / 0.0), or None if no usable JSON found.
|
||||
"""
|
||||
if not text:
|
||||
return None
|
||||
|
||||
candidate = _extract_last_json_object(text)
|
||||
if candidate is None:
|
||||
return None
|
||||
|
||||
usage = candidate.get("usage") or {}
|
||||
if not isinstance(usage, dict):
|
||||
usage = {}
|
||||
|
||||
cost = candidate.get("total_cost_usd")
|
||||
if cost is None:
|
||||
cost = candidate.get("cost_usd")
|
||||
|
||||
# If there is neither a usage block nor a cost, this isn't a result object.
|
||||
if not usage and cost is None:
|
||||
return None
|
||||
|
||||
def _int(v):
|
||||
try:
|
||||
return int(v)
|
||||
except (TypeError, ValueError):
|
||||
return 0
|
||||
|
||||
def _float(v):
|
||||
try:
|
||||
return float(v)
|
||||
except (TypeError, ValueError):
|
||||
return 0.0
|
||||
|
||||
return {
|
||||
"input_tokens": _int(usage.get("input_tokens")),
|
||||
"output_tokens": _int(usage.get("output_tokens")),
|
||||
"cache_read_tokens": _int(
|
||||
usage.get("cache_read_input_tokens", usage.get("cache_read_tokens"))
|
||||
),
|
||||
"cost_usd": _float(cost),
|
||||
}
|
||||
|
||||
|
||||
def _extract_last_json_object(text: str) -> dict | None:
|
||||
"""Return the last balanced top-level JSON object in `text` that parses.
|
||||
|
||||
Scans from the end for '}' and walks back to the matching '{' using a depth
|
||||
counter (string-aware), trying json.loads on each candidate. Robust to log
|
||||
lines or text emitted before the JSON.
|
||||
"""
|
||||
# Fast path: the whole stripped text is the JSON.
|
||||
stripped = text.strip()
|
||||
try:
|
||||
obj = json.loads(stripped)
|
||||
if isinstance(obj, dict):
|
||||
return obj
|
||||
except (ValueError, TypeError):
|
||||
pass
|
||||
|
||||
# Otherwise find the last balanced { ... } block.
|
||||
end = len(text)
|
||||
while True:
|
||||
close = text.rfind("}", 0, end)
|
||||
if close == -1:
|
||||
return None
|
||||
depth = 0
|
||||
in_str = False
|
||||
esc = False
|
||||
start = None
|
||||
for i in range(close, -1, -1):
|
||||
ch = text[i]
|
||||
if in_str:
|
||||
if esc:
|
||||
esc = False
|
||||
elif ch == "\\":
|
||||
esc = True
|
||||
elif ch == '"':
|
||||
in_str = False
|
||||
continue
|
||||
if ch == '"':
|
||||
in_str = True
|
||||
elif ch == "}":
|
||||
depth += 1
|
||||
elif ch == "{":
|
||||
depth -= 1
|
||||
if depth == 0:
|
||||
start = i
|
||||
break
|
||||
if start is not None:
|
||||
blob = text[start:close + 1]
|
||||
try:
|
||||
obj = json.loads(blob)
|
||||
if isinstance(obj, dict):
|
||||
return obj
|
||||
except (ValueError, TypeError):
|
||||
pass
|
||||
end = close # keep scanning earlier in the text
|
||||
|
||||
|
||||
def parse_usage_from_log(path: str) -> dict | None:
|
||||
"""Read a run log file and parse usage from it. Never raises."""
|
||||
try:
|
||||
with open(path, "r", encoding="utf-8", errors="replace") as f:
|
||||
return parse_usage_from_text(f.read())
|
||||
except OSError as e:
|
||||
logger.warning(f"parse_usage_from_log: cannot read {path}: {e}")
|
||||
return None
|
||||
|
||||
|
||||
def record_usage(run_id: int, usage: dict | None):
|
||||
"""Write parsed usage onto the agent_runs row. NULLs if usage is None."""
|
||||
if usage is None:
|
||||
logger.warning(f"run_id={run_id}: no usage JSON parsed, recording NULLs")
|
||||
usage = {}
|
||||
conn = get_db()
|
||||
try:
|
||||
conn.execute(
|
||||
"UPDATE agent_runs SET input_tokens=?, output_tokens=?, "
|
||||
"cache_read_tokens=?, cost_usd=? WHERE id=?",
|
||||
(
|
||||
usage.get("input_tokens"),
|
||||
usage.get("output_tokens"),
|
||||
usage.get("cache_read_tokens"),
|
||||
usage.get("cost_usd"),
|
||||
run_id,
|
||||
),
|
||||
)
|
||||
conn.commit()
|
||||
finally:
|
||||
conn.close()
|
||||
|
||||
|
||||
def fmt_tokens(n) -> str:
|
||||
"""Format a token count compactly: 1234 -> '1.2k', 2_500_000 -> '2.5M'."""
|
||||
try:
|
||||
n = int(n or 0)
|
||||
except (TypeError, ValueError):
|
||||
n = 0
|
||||
if n >= 1_000_000:
|
||||
return f"{n / 1_000_000:.1f}M"
|
||||
if n >= 1_000:
|
||||
return f"{n / 1_000:.1f}k"
|
||||
return str(n)
|
||||
|
||||
|
||||
def fmt_cost(c) -> str:
|
||||
"""Format USD cost with 2 decimals: '$0.21'."""
|
||||
try:
|
||||
c = float(c or 0.0)
|
||||
except (TypeError, ValueError):
|
||||
c = 0.0
|
||||
return f"${c:.2f}"
|
||||
|
||||
|
||||
# Pretty agent names for comments (mirrors STAGE_AUTHORS roles).
|
||||
AGENT_DISPLAY = {
|
||||
"analyst": "Analyst",
|
||||
"architect": "Architect",
|
||||
"developer": "Developer",
|
||||
"reviewer": "Reviewer",
|
||||
"tester": "Tester",
|
||||
"deployer": "Deployer",
|
||||
}
|
||||
|
||||
|
||||
def usage_comment(agent: str, usage: dict | None) -> str:
|
||||
"""Build the per-agent finish comment, e.g.
|
||||
'\U0001f4bb Developer \u0433\u043e\u0442\u043e\u0432 \u00b7 45.2k in / 12.1k out \u00b7 $0.21'.
|
||||
"""
|
||||
usage = usage or {}
|
||||
name = AGENT_DISPLAY.get(agent, agent.capitalize())
|
||||
icon = AGENT_ICON.get(agent, "\u2705")
|
||||
return (
|
||||
f"{icon} {name} \u0433\u043e\u0442\u043e\u0432 \u00b7 "
|
||||
f"{fmt_tokens(usage.get('input_tokens'))} in / "
|
||||
f"{fmt_tokens(usage.get('output_tokens'))} out \u00b7 "
|
||||
f"{fmt_cost(usage.get('cost_usd'))}"
|
||||
)
|
||||
|
||||
|
||||
AGENT_ICON = {
|
||||
"analyst": "\U0001f50d",
|
||||
"architect": "\U0001f4d0",
|
||||
"developer": "\U0001f4bb",
|
||||
"reviewer": "\U0001f50e",
|
||||
"tester": "\U0001f9ea",
|
||||
"deployer": "\U0001f680",
|
||||
}
|
||||
|
||||
|
||||
def task_usage_summary(task_id: int) -> dict:
|
||||
"""Aggregate agent_runs usage for a task.
|
||||
|
||||
Returns {total_in, total_out, total_cost, per_agent: [(agent, in, out, cost), ...]}.
|
||||
"""
|
||||
conn = get_db()
|
||||
try:
|
||||
rows = conn.execute(
|
||||
"SELECT agent, "
|
||||
"COALESCE(SUM(input_tokens),0), "
|
||||
"COALESCE(SUM(output_tokens),0), "
|
||||
"COALESCE(SUM(cost_usd),0.0) "
|
||||
"FROM agent_runs WHERE task_id=? GROUP BY agent ORDER BY agent",
|
||||
(task_id,),
|
||||
).fetchall()
|
||||
finally:
|
||||
conn.close()
|
||||
per_agent = [(r[0], int(r[1]), int(r[2]), float(r[3])) for r in rows]
|
||||
total_in = sum(r[1] for r in per_agent)
|
||||
total_out = sum(r[2] for r in per_agent)
|
||||
total_cost = sum(r[3] for r in per_agent)
|
||||
return {
|
||||
"total_in": total_in,
|
||||
"total_out": total_out,
|
||||
"total_cost": total_cost,
|
||||
"per_agent": per_agent,
|
||||
}
|
||||
|
||||
|
||||
def task_summary_comment(task_id: int) -> str:
|
||||
"""Build the Deployer end-of-task summary comment (Feature 4, variant B)."""
|
||||
s = task_usage_summary(task_id)
|
||||
lines = [
|
||||
f"\U0001f4ca \u0418\u0442\u043e\u0433\u043e \u043f\u043e \u0437\u0430\u0434\u0430\u0447\u0435: "
|
||||
f"{fmt_tokens(s['total_in'])} \u0442\u043e\u043a\u0435\u043d\u043e\u0432 \u0432\u0445\u043e\u0434 / "
|
||||
f"{fmt_tokens(s['total_out'])} \u0432\u044b\u0445\u043e\u0434 \u00b7 "
|
||||
f"{fmt_cost(s['total_cost'])}"
|
||||
]
|
||||
for agent, ti, to, cost in s["per_agent"]:
|
||||
name = AGENT_DISPLAY.get(agent, agent.capitalize())
|
||||
lines.append(
|
||||
f"\u2022 {name}: {fmt_tokens(ti)} in / {fmt_tokens(to)} out \u00b7 {fmt_cost(cost)}"
|
||||
)
|
||||
return "\n".join(lines)
|
||||
@@ -216,12 +216,31 @@ async def handle_ci_status(payload: dict):
|
||||
else:
|
||||
notify_qg_failure(task_id, current_stage, "check_ci_green", reason)
|
||||
|
||||
elif state == "failure":
|
||||
# S-1: Gitea CI is NOT the authoritative gate anymore (the orchestrator runs
|
||||
# tests locally via check_tests_local). Gitea CI is often unconfigured, so a
|
||||
# "failure"/empty status here is not actionable. Log only, do not alert.
|
||||
logger.debug(f"Task {task_id}: Gitea CI state='failure' on branch '{branch}' "
|
||||
f"(non-authoritative, suppressed — local tests are the gate)")
|
||||
elif state == "failure" and current_stage == "development":
|
||||
# CI is the authoritative gate for development -> review.
|
||||
# On red CI: notify, then bounce the task back to the developer (capped retries),
|
||||
# symmetric to the review REQUEST_CHANGES path.
|
||||
notify_qg_failure(task_id, current_stage, "check_ci_green", f"Gitea CI failed on branch '{branch}'")
|
||||
conn = get_db()
|
||||
retry_count = conn.execute(
|
||||
"SELECT COUNT(*) as cnt FROM agent_runs WHERE task_id = ? AND agent = 'developer'",
|
||||
(task_id,),
|
||||
).fetchone()["cnt"]
|
||||
conn.close()
|
||||
if retry_count < MAX_DEV_RETRIES:
|
||||
# task already on 'development' — no stage change needed, just relaunch developer
|
||||
try:
|
||||
task_desc = (
|
||||
f"Work item: {work_item_id}\nRepo: {repo_name}\nBranch: {branch}\n"
|
||||
f"Stage: development\nNote: CI failed, fix and re-push (attempt {retry_count + 1}/{MAX_DEV_RETRIES})"
|
||||
)
|
||||
job_id = enqueue_job("developer", repo_name, task_desc, task_id=task_id)
|
||||
logger.info(f"Task {task_id}: CI failed, enqueued developer (attempt {retry_count + 1}, job_id={job_id})")
|
||||
except Exception as e:
|
||||
notify_error(task_id, f"Failed to relaunch developer after CI failure: {e}")
|
||||
else:
|
||||
notify_error(task_id, f"Max developer retries ({MAX_DEV_RETRIES}) reached after CI failure, escalating")
|
||||
logger.error(f"Task {task_id}: max retries reached after CI failure, needs manual intervention")
|
||||
|
||||
|
||||
async def handle_pr(payload: dict):
|
||||
|
||||
@@ -13,6 +13,7 @@ from ..db import (
|
||||
get_db,
|
||||
get_task_by_plane_id,
|
||||
get_next_work_item_id,
|
||||
ensure_unique_work_item_id,
|
||||
update_task_stage,
|
||||
enqueue_job,
|
||||
insert_event_dedup,
|
||||
@@ -92,38 +93,264 @@ async def plane_webhook(request: Request):
|
||||
return {"status": "ignored", "reason": "unknown project"}
|
||||
|
||||
if (event == "work_item.created") or (event == "issue" and action == "created"):
|
||||
# Feature 1: creation NO LONGER starts the pipeline. Slava keeps the
|
||||
# backlog until he moves an issue to In Progress. We only run a soft
|
||||
# QG-0 sanity log here (no branch, no analyst, no task row).
|
||||
await handle_work_item_created(data, project_id)
|
||||
elif (event == "work_item.updated") or (event == "issue" and action == "updated"):
|
||||
# Status-only verdict model: status changes drive the pipeline.
|
||||
# Backlog/Todo/Triage -> In Progress : START pipeline, or relaunch the
|
||||
# stage agent if returned from
|
||||
# Needs Input.
|
||||
# -> Approved : advance to the next stage.
|
||||
# -> Rejected : rollback (reason from latest comment).
|
||||
await handle_issue_updated(data, project_id)
|
||||
elif (event == "comment.created") or (event == "issue_comment" and action == "created"):
|
||||
await handle_comment(data, project_id)
|
||||
|
||||
return {"status": "accepted"}
|
||||
|
||||
|
||||
async def handle_work_item_created(data: dict, project_id: str = ""):
|
||||
def _state_id(data: dict) -> str:
|
||||
"""Extract the new Plane state UUID from an 'issue updated' payload.
|
||||
|
||||
Real payload (verified from prod events): data.state is
|
||||
{id, name, color, group}. Some payloads carry state as a bare UUID string.
|
||||
"""
|
||||
New work item created in Plane.
|
||||
QG-0: validate title, description, priority.
|
||||
If valid: create branch, init docs, launch analyst.
|
||||
If invalid: comment with what's missing, set Blocked.
|
||||
state = data.get("state")
|
||||
if isinstance(state, dict):
|
||||
return state.get("id", "") or ""
|
||||
if isinstance(state, str):
|
||||
return state
|
||||
return ""
|
||||
|
||||
|
||||
async def handle_issue_updated(data: dict, project_id: str = ""):
|
||||
"""Feature 1 & 2: react to a Plane issue status change.
|
||||
|
||||
Routes the NEW state UUID (data.state.id) to:
|
||||
- in_progress : start the pipeline if this issue has no task yet; if a
|
||||
task already exists and the stage agent is idle (returned from Needs
|
||||
Input), relaunch the stage agent so it reads Slava's fresh comments.
|
||||
- approved : advance to the next stage.
|
||||
- rejected : rollback to the previous stage (reason from latest comment).
|
||||
Any other status (Needs Input, In Review, Blocked, Done, board stages, etc.)
|
||||
is ignored here — those are statuses the orchestrator itself sets.
|
||||
"""
|
||||
from ..plane_sync import PLANE_STATES
|
||||
|
||||
plane_id = str(data.get("id") or "")
|
||||
new_state = _state_id(data)
|
||||
if not plane_id or not new_state:
|
||||
logger.info("issue updated without id/state, ignoring")
|
||||
return
|
||||
|
||||
if new_state == PLANE_STATES["in_progress"]:
|
||||
await handle_status_start(data, project_id)
|
||||
elif new_state == PLANE_STATES["approved"]:
|
||||
await handle_verdict(data, project_id, approved=True)
|
||||
elif new_state == PLANE_STATES["rejected"]:
|
||||
await handle_verdict(data, project_id, approved=False)
|
||||
else:
|
||||
logger.info(f"issue {plane_id} updated to state {new_state[:8]}..., no pipeline action")
|
||||
|
||||
|
||||
async def handle_status_start(data: dict, project_id: str = ""):
|
||||
"""An issue moved into In Progress.
|
||||
|
||||
Two cases under the status-only verdict model:
|
||||
|
||||
1. No task yet for this plane_id -> START the pipeline (start_pipeline).
|
||||
|
||||
2. A task already exists -> this is Slava returning the issue from
|
||||
Needs Input to In Progress after answering the analyst's questions. We
|
||||
must RELAUNCH the current stage's agent so it reads the fresh comments
|
||||
from Plane (the answer-to-questions flow used to live in handle_comment;
|
||||
it is now status-driven).
|
||||
|
||||
KEY FORK — telling "answer to questions" apart from a plain duplicate In
|
||||
Progress webhook (the dedup-protection case):
|
||||
|
||||
The tasks table stores no Plane status, and the issue.updated payload only
|
||||
carries the NEW state (In Progress), so we cannot read the previous status
|
||||
from here. Instead we use the only reliable local signal: whether the
|
||||
stage's agent is currently in flight.
|
||||
|
||||
- The orchestrator sets In Progress itself while an agent runs. When the
|
||||
agent FINISHES it leaves the issue in Needs Input or In Review and has
|
||||
NO queued/running job. So: an existing task with NO active job means the
|
||||
agent is idle / waiting -> a return to In Progress is a genuine relaunch
|
||||
request -> enqueue the stage agent.
|
||||
- If a queued/running job already exists for the task, the agent is busy
|
||||
(or a duplicate webhook arrived) -> SKIP (no double launch). The events
|
||||
de-dup at the top of plane_webhook already absorbs identical webhook
|
||||
bodies; this job guard additionally covers distinct webhooks fired while
|
||||
a job is still pending/running.
|
||||
"""
|
||||
from ..db import has_active_job_for_task
|
||||
|
||||
plane_id = str(data.get("id") or "")
|
||||
existing = get_task_by_plane_id(plane_id)
|
||||
|
||||
if not existing:
|
||||
logger.info(f"Status->In Progress for {plane_id}: starting pipeline")
|
||||
await start_pipeline(data, project_id)
|
||||
return
|
||||
|
||||
task_id = existing["id"]
|
||||
current_stage = existing["stage"]
|
||||
repo = existing["repo"]
|
||||
work_item_id = existing.get("work_item_id", "")
|
||||
branch = existing.get("branch", "")
|
||||
|
||||
# Duplicate / busy guard: a job is already pending or running for this task.
|
||||
if has_active_job_for_task(task_id):
|
||||
logger.info(
|
||||
f"Status->In Progress for {plane_id}: task {task_id} already has an "
|
||||
f"active job (stage={current_stage}), not relaunching"
|
||||
)
|
||||
return
|
||||
|
||||
# Agent is idle -> Slava answered questions and returned the issue to In
|
||||
# Progress. Relaunch the current stage's agent to read the fresh comments.
|
||||
from ..plane_sync import STAGE_AUTHORS, add_comment as _add_comment
|
||||
stage_agent = STAGE_AUTHORS.get(current_stage)
|
||||
if not stage_agent:
|
||||
logger.info(
|
||||
f"Status->In Progress for {plane_id}: no agent for stage "
|
||||
f"'{current_stage}', not relaunching"
|
||||
)
|
||||
return
|
||||
|
||||
task_desc = (
|
||||
f"Work item: {work_item_id}\nRepo: {repo}\nBranch: {branch}\n"
|
||||
f"Stage: {current_stage}\nNote: Stakeholder returned the issue to In "
|
||||
f"Progress (answered your questions). Read the latest comments in Plane "
|
||||
f"and revise your artifacts."
|
||||
)
|
||||
job_id = enqueue_job(stage_agent, repo, task_desc, task_id=task_id)
|
||||
logger.info(
|
||||
f"Task {task_id}: returned to In Progress (Needs Input answered), "
|
||||
f"relaunched {stage_agent} for stage {current_stage} (job_id={job_id})"
|
||||
)
|
||||
try:
|
||||
_add_comment(
|
||||
work_item_id,
|
||||
"\U0001f504 \u0410\u0433\u0435\u043d\u0442 \u043f\u0435\u0440\u0435\u0437\u0430\u043f\u0443\u0449\u0435\u043d \u0441 \u043e\u0442\u0432\u0435\u0442\u0430\u043c\u0438 \u0441\u0442\u0435\u0439\u043a\u0445\u043e\u043b\u0434\u0435\u0440\u0430.",
|
||||
author=stage_agent,
|
||||
)
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to post relaunch comment for {work_item_id}: {e}")
|
||||
|
||||
|
||||
async def handle_verdict(data: dict, project_id: str, approved: bool):
|
||||
"""Status-only verdict: a Plane status change drives advance / rollback.
|
||||
|
||||
Approved status -> _try_advance_stage. We do NOT touch the issue status here:
|
||||
_try_advance_stage -> advance_stage -> plane_notify_stage already PATCHes the
|
||||
issue to the NEXT stage's status. The old set_issue_in_progress call reset
|
||||
the status to In Progress first, which made the board flicker In Progress
|
||||
before the next stage (part of bug 3); it is removed.
|
||||
|
||||
Rejected status -> rollback to the previous stage. The reason is pulled from
|
||||
the issue's latest comment (Slava writes the reason in a comment before/with
|
||||
flipping the status to Rejected).
|
||||
"""
|
||||
plane_id = str(data.get("id") or "")
|
||||
task = get_task_by_plane_id(plane_id)
|
||||
if not task:
|
||||
logger.warning(f"Verdict status for {plane_id} but no task found, ignoring")
|
||||
return
|
||||
|
||||
task_id = task["id"]
|
||||
current_stage = task["stage"]
|
||||
repo = task["repo"]
|
||||
work_item_id = task.get("work_item_id", "")
|
||||
branch = task.get("branch", "")
|
||||
|
||||
if approved:
|
||||
# NOTE: no set_issue_in_progress here — _try_advance_stage sets the next
|
||||
# stage's status itself (advance_stage -> plane_notify_stage).
|
||||
logger.info(f"Task {task_id}: Approved status -> advance from {current_stage}")
|
||||
await _try_advance_stage(task_id, current_stage, repo, work_item_id, branch)
|
||||
return
|
||||
|
||||
# Rejected: pull the rejection reason from the issue's latest comment.
|
||||
issue_id = task.get("plane_issue_id") or task.get("plane_id") or plane_id
|
||||
reason = _latest_comment_reason(issue_id, repo, project_id)
|
||||
await _rollback_stage(
|
||||
task_id, current_stage, repo, work_item_id, branch, reason
|
||||
)
|
||||
|
||||
|
||||
def _latest_comment_reason(issue_id: str, repo: str, project_id: str = "") -> str:
|
||||
"""Fetch the issue's most recent comment text (HTML stripped) as the reject
|
||||
reason. Slava writes the reason in a comment before/with flipping the status
|
||||
to Rejected.
|
||||
|
||||
Returns a fixed fallback when there is no comment / the API call fails.
|
||||
"""
|
||||
from ..plane_sync import (
|
||||
PLANE_BASE,
|
||||
PLANE_HEADERS,
|
||||
WORKSPACE,
|
||||
PROJECT_ID as _DEFAULT_PROJECT_ID,
|
||||
)
|
||||
fallback = "Rejected via status, no reason comment"
|
||||
if not issue_id:
|
||||
return fallback
|
||||
_proj = get_project_by_repo(repo)
|
||||
pid = _proj.plane_project_id if _proj else (project_id or _DEFAULT_PROJECT_ID)
|
||||
url = (
|
||||
f"{PLANE_BASE}/workspaces/{WORKSPACE}/projects/{pid}/issues/"
|
||||
f"{issue_id}/comments/"
|
||||
)
|
||||
try:
|
||||
resp = httpx.get(url, headers=PLANE_HEADERS, timeout=10)
|
||||
if resp.status_code != 200:
|
||||
logger.warning(
|
||||
f"reject-reason: GET comments for {issue_id} returned "
|
||||
f"{resp.status_code}"
|
||||
)
|
||||
return fallback
|
||||
payload = resp.json()
|
||||
comments = payload.get("results", payload) if isinstance(payload, dict) else payload
|
||||
if not comments:
|
||||
return fallback
|
||||
latest = max(comments, key=lambda c: c.get("created_at", "") or "")
|
||||
raw = (
|
||||
latest.get("comment_stripped")
|
||||
or latest.get("comment_html")
|
||||
or latest.get("comment")
|
||||
or ""
|
||||
)
|
||||
text = re.sub(r"<[^>]+>", "", raw).strip()
|
||||
return text[:300] if text else fallback
|
||||
except Exception as e:
|
||||
logger.error(f"reject-reason: failed to fetch comments for {issue_id}: {e}")
|
||||
return fallback
|
||||
|
||||
|
||||
async def handle_work_item_created(data: dict, project_id: str = ""):
|
||||
"""Feature 1: creation does NOT start the pipeline anymore.
|
||||
|
||||
The pipeline is started when Slava moves the issue into In Progress
|
||||
(handle_status_start -> start_pipeline). On creation we only run a SOFT QG-0
|
||||
sanity check and log the result — NO branch, NO docs, NO analyst, NO task row
|
||||
— so the issue can sit in the backlog until Slava is ready.
|
||||
"""
|
||||
plane_id = data.get("id", "")
|
||||
name = data.get("name", "untitled")
|
||||
description = data.get("description_stripped", data.get("description", ""))
|
||||
priority = data.get("priority", {})
|
||||
priority_name = priority if isinstance(priority, str) else priority.get("name", "")
|
||||
errors = _qg0_errors(name, description)
|
||||
if errors:
|
||||
logger.info(f"work_item.created {plane_id}: soft QG-0 warnings: {errors}")
|
||||
else:
|
||||
logger.info(f"work_item.created {plane_id} ('{name}'): in backlog, awaiting In Progress")
|
||||
|
||||
# ORCH-6: resolve repo / prefix / Plane project from the registry instead of
|
||||
# the single hardcoded default_repo.
|
||||
if not project_id:
|
||||
project_id = data.get("project") or data.get("project_id") or ""
|
||||
proj = get_project_by_plane_id(project_id)
|
||||
if not proj:
|
||||
logger.warning(f"handle_work_item_created: unknown project '{project_id}', ignoring {plane_id}")
|
||||
return
|
||||
repo = proj.repo
|
||||
plane_project_id = proj.plane_project_id
|
||||
|
||||
# QG-0 validation
|
||||
def _qg0_errors(name: str, description: str) -> list:
|
||||
"""QG-0 validation: returns a list of human-readable problems (empty = OK)."""
|
||||
errors = []
|
||||
if not name or len(name) < 5:
|
||||
errors.append("Title \u0441\u043b\u0438\u0448\u043a\u043e\u043c \u043a\u043e\u0440\u043e\u0442\u043a\u0438\u0439 (\u043d\u0443\u0436\u043d\u043e >= 5 \u0441\u0438\u043c\u0432\u043e\u043b\u043e\u0432)")
|
||||
@@ -132,6 +359,66 @@ async def handle_work_item_created(data: dict, project_id: str = ""):
|
||||
if not description or len(description.strip()) < 20:
|
||||
errors.append("Description \u0441\u043b\u0438\u0448\u043a\u043e\u043c \u043a\u043e\u0440\u043e\u0442\u043a\u0438\u0439 (\u043d\u0443\u0436\u043d\u043e >= 20 \u0441\u0438\u043c\u0432\u043e\u043b\u043e\u0432)")
|
||||
|
||||
return errors
|
||||
|
||||
|
||||
async def start_pipeline(data: dict, project_id: str = ""):
|
||||
"""Feature 1: start the pipeline for an issue (moved to In Progress).
|
||||
|
||||
This is the body extracted from the old handle_work_item_created: resolve the
|
||||
project, run QG-0 (hard — blocks on failure), create the work item id +
|
||||
branch + initial docs, insert the task row, and enqueue the analyst.
|
||||
|
||||
Callers (handle_status_start) already guarantee no existing task for this
|
||||
plane_id, so this never duplicates.
|
||||
"""
|
||||
plane_id = data.get("id", "")
|
||||
name = data.get("name", "untitled")
|
||||
description = data.get("description_stripped", data.get("description", ""))
|
||||
|
||||
# ORCH-6: resolve repo / prefix / Plane project from the registry instead of
|
||||
# the single hardcoded default_repo.
|
||||
if not project_id:
|
||||
project_id = data.get("project") or data.get("project_id") or ""
|
||||
proj = get_project_by_plane_id(project_id)
|
||||
if not proj:
|
||||
logger.warning(f"start_pipeline: unknown project '{project_id}', ignoring {plane_id}")
|
||||
return
|
||||
repo = proj.repo
|
||||
plane_project_id = proj.plane_project_id
|
||||
|
||||
# BUG 1 + BUG B: Plane's issue.updated webhook (status change -> In Progress)
|
||||
# sends only the CHANGED fields, so BOTH description / description_stripped
|
||||
# AND name are usually empty here even though the issue HAS them. Pull the
|
||||
# full title + description from the Plane issue detail API in a SINGLE GET
|
||||
# (fetch_issue_fields: same endpoint + shared token already used by
|
||||
# fetch_issue_sequence_id) before QG-0 and before the branch slug is built.
|
||||
# If the API is also empty, QG-0 legitimately fails (truly empty ticket) and
|
||||
# name falls back to "untitled".
|
||||
name_missing = (not name) or name.strip().lower() == "untitled" or len(name.strip()) < 3
|
||||
desc_missing = (not description) or len(description.strip()) < 20
|
||||
if name_missing or desc_missing:
|
||||
from ..plane_sync import fetch_issue_fields
|
||||
fetched_name, fetched_desc = fetch_issue_fields(plane_id, plane_project_id)
|
||||
if desc_missing and fetched_desc and len(fetched_desc.strip()) >= len(description.strip()):
|
||||
description = fetched_desc
|
||||
logger.info(
|
||||
f"start_pipeline: pulled description from Plane API for {plane_id} "
|
||||
f"({len(description.strip())} chars)"
|
||||
)
|
||||
if name_missing and fetched_name and len(fetched_name.strip()) >= 3:
|
||||
name = fetched_name
|
||||
logger.info(
|
||||
f"start_pipeline: pulled name from Plane API for {plane_id} "
|
||||
f"('{name}')"
|
||||
)
|
||||
# BUG B fallback: if name is still empty/blank after the API pull, keep the
|
||||
# legacy "untitled" so the slug/branch build never crashes on an empty name.
|
||||
if not name or not name.strip():
|
||||
name = "untitled"
|
||||
|
||||
# QG-0 validation (hard gate on pipeline start)
|
||||
errors = _qg0_errors(name, description)
|
||||
if errors:
|
||||
# QG-0 failed
|
||||
error_text = "\u26a0\ufe0f QG-0 failed:\n" + "\n".join(f"\u2022 {e}" for e in errors)
|
||||
@@ -169,10 +456,41 @@ async def handle_work_item_created(data: dict, project_id: str = ""):
|
||||
f"fell back to DB increment: {work_item_id}"
|
||||
)
|
||||
|
||||
# BUG 2a: uniqueness-guard LAYERED ON TOP of the M-6 derive above (the derive
|
||||
# itself is untouched). If the derived ET-NNN is already taken by another
|
||||
# task in this repo (collision -> two tasks would share branch/worktree, see
|
||||
# ET-006), bump to the next free number.
|
||||
_derived = work_item_id
|
||||
work_item_id = ensure_unique_work_item_id(work_item_id, repo)
|
||||
if work_item_id != _derived:
|
||||
logger.warning(
|
||||
f"work_item_id collision: derived {_derived} already in use for "
|
||||
f"{repo}; reassigned {plane_id} -> {work_item_id}"
|
||||
)
|
||||
|
||||
# Create slug from name
|
||||
slug = re.sub(r"[^a-z0-9]+", "-", name.lower()).strip("-")[:30]
|
||||
branch = f"feature/{work_item_id}-{slug}"
|
||||
|
||||
# BUG 2b (defense-in-depth): the worktree/path is keyed by BRANCH
|
||||
# (git_worktree.get_worktree_path) and tasks are reverse-resolved by
|
||||
# (repo, branch). With 2a the work_item_id is unique, so the branch prefix is
|
||||
# too; but the slug could still collide (e.g. two issues with the same title
|
||||
# under different ids -> fine) or, worse, an identical branch already exist.
|
||||
# Guard physically: if this exact branch is already owned by another task in
|
||||
# this repo, disambiguate with the (now unique) work_item_id so two tasks can
|
||||
# never share a worktree.
|
||||
_conn_b = get_db()
|
||||
_branch_taken = _conn_b.execute(
|
||||
"SELECT 1 FROM tasks WHERE repo = ? AND branch = ? LIMIT 1", (repo, branch)
|
||||
).fetchone()
|
||||
_conn_b.close()
|
||||
if _branch_taken is not None:
|
||||
branch = f"feature/{work_item_id}-{plane_id[:8]}"
|
||||
logger.warning(
|
||||
f"branch collision for {repo}; disambiguated to unique branch {branch}"
|
||||
)
|
||||
|
||||
# Insert task into DB
|
||||
conn = get_db()
|
||||
conn.execute(
|
||||
@@ -204,133 +522,104 @@ async def handle_work_item_created(data: dict, project_id: str = ""):
|
||||
task_row = get_db().execute("SELECT id FROM tasks WHERE work_item_id=?", (work_item_id,)).fetchone()
|
||||
if task_row:
|
||||
task_id = task_row[0]
|
||||
task_desc = f"Work item: {work_item_id}\nRepo: {repo}\nBranch: {branch}\nStage: analysis\nTitle: {name}"
|
||||
task_desc = (
|
||||
f"Work item: {work_item_id}\nRepo: {repo}\nBranch: {branch}\n"
|
||||
f"Stage: analysis\nTitle: {name}\n\nDescription:\n{description}"
|
||||
)
|
||||
job_id = enqueue_job("analyst", repo, task_desc, task_id=task_id)
|
||||
logger.info(f"Task {task_id}: enqueued analyst (job_id={job_id})")
|
||||
# Post start comment to Plane
|
||||
from ..plane_sync import add_comment as _add_comment
|
||||
_add_comment(work_item_id, "\U0001f50d Analyst \u0437\u0430\u043f\u0443\u0449\u0435\u043d. BRD/\u0422\u0417/AC/TestPlan \u0432 \u0440\u0430\u0431\u043e\u0442\u0435 (\u043e\u0436\u0438\u0434\u0430\u0439\u0442\u0435 8-15 \u043c\u0438\u043d).")
|
||||
_add_comment(work_item_id, "\U0001f50d Analyst \u0437\u0430\u043f\u0443\u0449\u0435\u043d. BRD/\u0422\u0417/AC/TestPlan \u0432 \u0440\u0430\u0431\u043e\u0442\u0435 (\u043e\u0436\u0438\u0434\u0430\u0439\u0442\u0435 8-15 \u043c\u0438\u043d).", author="analyst")
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to launch analyst for {work_item_id}: {e}")
|
||||
|
||||
|
||||
async def handle_comment(data: dict, project_id: str = ""):
|
||||
"""Status-only verdict model: comments NEVER drive the pipeline.
|
||||
|
||||
The whole comment-based control mechanism (``:approved:`` / ``:rejected:``
|
||||
and the analysis answer-to-questions flow) was removed. It caused bug 3
|
||||
(echo self-hit): the analyst posts its own "waiting for approval" comment,
|
||||
handle_comment catches its own comment and reverts In Review -> In Progress.
|
||||
|
||||
Comments are now logged only — no status change, no enqueue, no side effect.
|
||||
The pipeline is driven solely by status changes (handle_issue_updated):
|
||||
- Approved -> advance
|
||||
- Rejected -> rollback (reason pulled from the latest comment)
|
||||
- In Progress (returned from Needs Input) -> relaunch the stage agent
|
||||
"""
|
||||
Handle comment event — check for :approved: or :rejected:.
|
||||
Advance or rollback stage accordingly.
|
||||
plane_id = str(
|
||||
data.get("work_item_id") or data.get("issue_id") or data.get("issue") or ""
|
||||
)
|
||||
logger.info(
|
||||
f"comment.created for {plane_id}: logged only, no pipeline action "
|
||||
f"(status-only verdict model)"
|
||||
)
|
||||
|
||||
|
||||
async def _rollback_stage(
|
||||
task_id: int, current_stage: str, repo: str, work_item_id: str, branch: str,
|
||||
reason: str,
|
||||
):
|
||||
"""Rollback triggered by a status change to Rejected.
|
||||
|
||||
- at analysis: relaunch the analyst with the rejection reason;
|
||||
- otherwise: roll back to the previous stage and relaunch its agent
|
||||
(via the existing rollback notify + an enqueue of the prev-stage agent).
|
||||
"""
|
||||
comment_body = data.get("comment_stripped", data.get("comment", data.get("body", data.get("comment_html", ""))))
|
||||
plane_id = str(data.get("work_item_id") or data.get("issue_id") or data.get("issue") or "")
|
||||
|
||||
if not plane_id:
|
||||
logger.warning("Comment event without work_item_id, skipping")
|
||||
return
|
||||
|
||||
task = get_task_by_plane_id(plane_id)
|
||||
if not task:
|
||||
logger.warning(f"No task found for plane_id={plane_id}")
|
||||
return
|
||||
|
||||
task_id = task["id"]
|
||||
current_stage = task["stage"]
|
||||
repo = task["repo"]
|
||||
work_item_id = task.get("work_item_id", "")
|
||||
branch = task.get("branch", "")
|
||||
|
||||
if ":rejected:" in comment_body:
|
||||
# Extract reason (text after :rejected:)
|
||||
reason = comment_body.split(":rejected:", 1)[-1].strip()[:300]
|
||||
|
||||
if current_stage == "analysis":
|
||||
# Already in analysis — just relaunch analyst with rejection reason
|
||||
from ..plane_sync import set_issue_in_progress
|
||||
set_issue_in_progress(work_item_id)
|
||||
task_desc = (
|
||||
f"Work item: {work_item_id}\nRepo: {repo}\nBranch: {branch}\n"
|
||||
f"Stage: analysis\nNote: Stakeholder REJECTED your artifacts. "
|
||||
f"Reason: {reason}\nRevise and improve."
|
||||
)
|
||||
new_job = enqueue_job("analyst", repo, task_desc, task_id=task_id)
|
||||
from ..plane_sync import add_comment as _plane_comment
|
||||
_plane_comment(work_item_id, f"\U0001f504 Analyst \u043f\u0435\u0440\u0435\u0437\u0430\u043f\u0443\u0449\u0435\u043d. \u041f\u0440\u0438\u0447\u0438\u043d\u0430 \u043e\u0442\u043a\u043b\u043e\u043d\u0435\u043d\u0438\u044f: {reason}")
|
||||
logger.info(f"Task {task_id}: rejected at analysis, enqueued analyst (job_id={new_job})")
|
||||
else:
|
||||
# Rollback to previous stage
|
||||
prev_stage = get_previous_stage(current_stage)
|
||||
if prev_stage:
|
||||
update_task_stage(task_id, prev_stage)
|
||||
from ..plane_sync import set_issue_in_progress
|
||||
set_issue_in_progress(work_item_id)
|
||||
notify_stage_change(task_id, current_stage, prev_stage)
|
||||
plane_notify_stage(work_item_id, current_stage, prev_stage)
|
||||
from ..plane_sync import add_comment as _plane_comment
|
||||
_plane_comment(work_item_id, f"\U0001f504 \u041e\u0442\u043a\u0430\u0442: {current_stage} \u2192 {prev_stage}. \u041f\u0440\u0438\u0447\u0438\u043d\u0430: {reason}")
|
||||
logger.info(f"Task {task_id}: rejected, rolled back {current_stage} \u2192 {prev_stage}")
|
||||
return
|
||||
|
||||
if ":approved:" in comment_body:
|
||||
if current_stage == "analysis":
|
||||
# Already in analysis — just relaunch analyst with rejection reason
|
||||
from ..plane_sync import set_issue_in_progress
|
||||
set_issue_in_progress(work_item_id)
|
||||
# Try to advance stage
|
||||
await _try_advance_stage(task_id, current_stage, repo, work_item_id, branch)
|
||||
task_desc = (
|
||||
f"Work item: {work_item_id}\nRepo: {repo}\nBranch: {branch}\n"
|
||||
f"Stage: analysis\nNote: Stakeholder REJECTED your artifacts. "
|
||||
f"Reason: {reason}\nRevise and improve."
|
||||
)
|
||||
new_job = enqueue_job("analyst", repo, task_desc, task_id=task_id)
|
||||
from ..plane_sync import add_comment as _plane_comment
|
||||
_plane_comment(work_item_id, f"\U0001f504 Analyst \u043f\u0435\u0440\u0435\u0437\u0430\u043f\u0443\u0449\u0435\u043d. \u041f\u0440\u0438\u0447\u0438\u043d\u0430 \u043e\u0442\u043a\u043b\u043e\u043d\u0435\u043d\u0438\u044f: {reason}", author="analyst")
|
||||
logger.info(f"Task {task_id}: rejected at analysis, enqueued analyst (job_id={new_job})")
|
||||
return
|
||||
|
||||
# Task 3: If neither :approved: nor :rejected: — check if this is an answer to questions
|
||||
if current_stage == "analysis":
|
||||
from ..plane_sync import PLANE_STATES, set_issue_in_progress
|
||||
issue_id = task.get("plane_issue_id") or task.get("plane_id")
|
||||
if not issue_id:
|
||||
issue_id = plane_id
|
||||
if issue_id:
|
||||
from ..plane_sync import PLANE_BASE, PLANE_HEADERS, WORKSPACE
|
||||
from ..plane_sync import PROJECT_ID as _DEFAULT_PROJECT_ID
|
||||
# ORCH-6: route to this task's own Plane project (resolved from repo).
|
||||
_proj = get_project_by_repo(repo)
|
||||
_pid = _proj.plane_project_id if _proj else (project_id or _DEFAULT_PROJECT_ID)
|
||||
import httpx as _httpx
|
||||
try:
|
||||
_resp = _httpx.get(
|
||||
f"{PLANE_BASE}/workspaces/{WORKSPACE}/projects/{_pid}/issues/{issue_id}/",
|
||||
headers=PLANE_HEADERS, timeout=10
|
||||
)
|
||||
if _resp.status_code == 200:
|
||||
issue_data = _resp.json()
|
||||
if issue_data.get("state") == PLANE_STATES["needs_input"]:
|
||||
# Task 11: Check analyst retry count (max 3 question rounds)
|
||||
conn3 = get_db()
|
||||
analyst_runs = conn3.execute(
|
||||
"SELECT COUNT(*) FROM agent_runs WHERE task_id=? AND agent='analyst'",
|
||||
(task_id,)
|
||||
).fetchone()[0]
|
||||
conn3.close()
|
||||
|
||||
if analyst_runs >= 4: # initial + 3 retries
|
||||
from ..plane_sync import set_issue_blocked, add_comment as _pc
|
||||
set_issue_blocked(work_item_id)
|
||||
_pc(
|
||||
work_item_id,
|
||||
"\U0001f6a8 3 \u0440\u0430\u0443\u043d\u0434\u0430 \u0443\u0442\u043e\u0447\u043d\u0435\u043d\u0438\u0439 \u0438\u0441\u0447\u0435\u0440\u043f\u0430\u043d\u044b. Analyst \u043d\u0435 \u043c\u043e\u0436\u0435\u0442 \u0441\u0444\u043e\u0440\u043c\u0438\u0440\u043e\u0432\u0430\u0442\u044c \u0422\u0417. "
|
||||
"\u0422\u0440\u0435\u0431\u0443\u0435\u0442\u0441\u044f \u0431\u043e\u043b\u0435\u0435 \u0434\u0435\u0442\u0430\u043b\u044c\u043d\u043e\u0435 \u043e\u043f\u0438\u0441\u0430\u043d\u0438\u0435 \u0438\u043b\u0438 \u0432\u0441\u0442\u0440\u0435\u0447\u0430."
|
||||
)
|
||||
from ..notifications import send_telegram
|
||||
send_telegram(f"\U0001f6a8 {work_item_id}: 3 \u0440\u0430\u0443\u043d\u0434\u0430 \u0432\u043e\u043f\u0440\u043e\u0441\u043e\u0432 analyst'\u0430 \u0438\u0441\u0447\u0435\u0440\u043f\u0430\u043d\u044b. \u041d\u0443\u0436\u043d\u0430 \u043f\u043e\u043c\u043e\u0449\u044c.")
|
||||
return
|
||||
|
||||
# This is an answer to analyst's questions — relaunch
|
||||
set_issue_in_progress(work_item_id)
|
||||
task_desc = (
|
||||
f"Work item: {work_item_id}\nRepo: {repo}\nBranch: {branch}\n"
|
||||
f"Stage: analysis\nNote: Stakeholder answered your questions. "
|
||||
f"Read the latest comment in Plane and revise your artifacts.\n"
|
||||
f"Answer: {comment_body[:500]}"
|
||||
)
|
||||
new_job = enqueue_job("analyst", repo, task_desc, task_id=task_id)
|
||||
from ..plane_sync import add_comment as _pc2
|
||||
_pc2(work_item_id, "\U0001f504 Analyst \u043f\u0435\u0440\u0435\u0437\u0430\u043f\u0443\u0449\u0435\u043d \u0441 \u043e\u0442\u0432\u0435\u0442\u0430\u043c\u0438 \u0441\u0442\u0435\u0439\u043a\u0445\u043e\u043b\u0434\u0435\u0440\u0430.")
|
||||
logger.info(f"Task {task_id}: stakeholder answered questions, enqueued analyst (job_id={new_job})")
|
||||
return
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to check issue state: {e}")
|
||||
# Rollback to previous stage
|
||||
prev_stage = get_previous_stage(current_stage)
|
||||
if not prev_stage:
|
||||
logger.info(f"Task {task_id}: rejected at {current_stage} but no previous stage")
|
||||
return
|
||||
update_task_stage(task_id, prev_stage)
|
||||
notify_stage_change(task_id, current_stage, prev_stage)
|
||||
# Feature 3: plane_notify_stage moves the board to the prev stage's status.
|
||||
plane_notify_stage(work_item_id, current_stage, prev_stage)
|
||||
# Then put it back to In Progress so the relaunched agent is clearly working.
|
||||
from ..plane_sync import set_issue_in_progress
|
||||
set_issue_in_progress(work_item_id)
|
||||
from ..plane_sync import add_comment as _plane_comment, STAGE_AUTHORS
|
||||
_plane_comment(
|
||||
work_item_id,
|
||||
f"\U0001f504 \u041e\u0442\u043a\u0430\u0442: {current_stage} \u2192 {prev_stage}. \u041f\u0440\u0438\u0447\u0438\u043d\u0430: {reason}",
|
||||
author=STAGE_AUTHORS.get(prev_stage, "stream"),
|
||||
)
|
||||
# Relaunch the previous stage's agent so the rollback actually re-runs work.
|
||||
# STAGE_AUTHORS maps a stage directly to the role that OWNS work in it
|
||||
# (analysis->analyst, architecture->architect, ...), which is exactly the
|
||||
# agent we must re-run on a rollback into prev_stage.
|
||||
from ..plane_sync import STAGE_AUTHORS as _STAGE_AUTHORS
|
||||
prev_agent = _STAGE_AUTHORS.get(prev_stage)
|
||||
if prev_agent:
|
||||
task_desc = (
|
||||
f"Work item: {work_item_id}\nRepo: {repo}\nBranch: {branch}\n"
|
||||
f"Stage: {prev_stage}\nNote: Stakeholder REJECTED. Reason: {reason}\n"
|
||||
f"Revise and improve."
|
||||
)
|
||||
new_job = enqueue_job(prev_agent, repo, task_desc, task_id=task_id)
|
||||
logger.info(
|
||||
f"Task {task_id}: rejected, rolled back {current_stage} \u2192 {prev_stage}, "
|
||||
f"enqueued {prev_agent} (job_id={new_job})"
|
||||
)
|
||||
else:
|
||||
logger.info(f"Task {task_id}: rejected, rolled back {current_stage} \u2192 {prev_stage}")
|
||||
|
||||
|
||||
async def _try_advance_stage(
|
||||
@@ -343,10 +632,10 @@ async def _try_advance_stage(
|
||||
is synchronous. We run it off the event loop via asyncio.to_thread so there
|
||||
is exactly one implementation shared with the launcher.
|
||||
|
||||
finished_agent is None on this webhook path (a human :approved: comment, not
|
||||
a finished agent), so the agent-specific rollback branches inside the engine
|
||||
intentionally do not trigger — identical to the old plane behavior, which
|
||||
only ran the QG and either advanced or reported the failure.
|
||||
finished_agent is None on this webhook path (a human Approved status change,
|
||||
not a finished agent), so the agent-specific rollback branches inside the
|
||||
engine intentionally do not trigger — the webhook path only runs the QG and
|
||||
either advances or reports the failure.
|
||||
"""
|
||||
import asyncio
|
||||
from ..stage_engine import advance_stage
|
||||
|
||||
40
tests/conftest.py
Normal file
40
tests/conftest.py
Normal file
@@ -0,0 +1,40 @@
|
||||
"""Global pytest fixtures.
|
||||
|
||||
test(conftest): mute Telegram in ALL tests to stop prod leakage.
|
||||
|
||||
Background: a pytest run on prod was sending REAL Telegram messages to Slava,
|
||||
because some tests (e.g. test_webhook_dedup advancing a stage) reach
|
||||
notify_stage_change -> send_telegram, which reads the live .env
|
||||
telegram_bot_token/chat_id and actually POSTs to Telegram.
|
||||
|
||||
This autouse fixture stubs send_telegram to a no-op for every test:
|
||||
|
||||
- "src.notifications.send_telegram" is the SOURCE. All the notify_* helpers in
|
||||
notifications.py call the module-global send_telegram, and every other module
|
||||
that does a *local* `from .notifications import send_telegram` inside a
|
||||
function resolves it live at call time -> covered by patching the source.
|
||||
|
||||
- "src.stage_engine.send_telegram" is patched too, because stage_engine binds
|
||||
send_telegram as a MODULE-LEVEL name (from .notifications import send_telegram
|
||||
at import), so a patch of the source alone would not intercept its 3 direct
|
||||
calls. webhooks/plane and launcher import it locally inside functions, so the
|
||||
source patch already covers them; they are patched defensively with
|
||||
raising=False anyway in case that ever changes.
|
||||
|
||||
raising=False so a module that doesn't (yet) expose the name never breaks setup.
|
||||
"""
|
||||
|
||||
import pytest
|
||||
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def _no_telegram(monkeypatch):
|
||||
_noop = lambda *a, **k: None # noqa: E731
|
||||
# Source of truth (covers notifications.notify_* and all local re-imports).
|
||||
monkeypatch.setattr("src.notifications.send_telegram", _noop, raising=False)
|
||||
# Module-level binding in stage_engine (and defensive coverage elsewhere).
|
||||
monkeypatch.setattr("src.stage_engine.send_telegram", _noop, raising=False)
|
||||
monkeypatch.setattr("src.webhooks.plane.send_telegram", _noop, raising=False)
|
||||
monkeypatch.setattr("src.agents.launcher.send_telegram", _noop, raising=False)
|
||||
monkeypatch.setattr("src.queue_worker.send_telegram", _noop, raising=False)
|
||||
yield
|
||||
74
tests/test_analyst_comment.py
Normal file
74
tests/test_analyst_comment.py
Normal file
@@ -0,0 +1,74 @@
|
||||
"""BUG C: analyst "artifacts ready" comment under the status-only model.
|
||||
|
||||
The comment must ask for the **Approved** status (not the obsolete
|
||||
":approved:" reaction, not moving back to "In Progress") and link only the
|
||||
docs that actually exist in the worktree.
|
||||
"""
|
||||
|
||||
import os
|
||||
import tempfile
|
||||
|
||||
os.environ.setdefault("ORCH_GITEA_TOKEN", "test-token")
|
||||
os.environ.setdefault("ORCH_PLANE_API_TOKEN", "test-token")
|
||||
|
||||
|
||||
def test_analyst_comment_asks_approved_with_links(monkeypatch, tmp_path):
|
||||
from src import stage_engine as SE
|
||||
|
||||
# Worktree with only SOME of the candidate docs present.
|
||||
wt = tmp_path / "wt"
|
||||
docs = wt / "docs" / "work-items" / "ET-011"
|
||||
docs.mkdir(parents=True)
|
||||
for fname in ("00-business-request.md", "01-brd.md", "02-trz.md",
|
||||
"03-acceptance-criteria.md", "04-test-plan.yaml"):
|
||||
(docs / fname).write_text("x")
|
||||
# 04b-ui-test-cases.md intentionally absent -> must NOT be linked
|
||||
|
||||
monkeypatch.setattr(SE, "get_worktree_path", lambda repo, branch: str(wt))
|
||||
# public URL set -> links must be built from it (not gitea_url)
|
||||
monkeypatch.setattr(SE.settings, "gitea_url", "http://localhost:3000")
|
||||
monkeypatch.setattr(SE.settings, "gitea_public_url", "https://git.mva154.duckdns.org")
|
||||
monkeypatch.setattr(SE.settings, "gitea_owner", "admin")
|
||||
|
||||
html = SE._build_analyst_ready_comment(
|
||||
"enduro-trails", "ET-011", "feature/ET-011-gpx-upload-feature"
|
||||
)
|
||||
|
||||
# text asks for the Approved STATUS, not the obsolete mechanisms
|
||||
assert "Approved" in html
|
||||
assert ":approved:" not in html
|
||||
assert "In Progress" not in html
|
||||
assert "Rejected" in html
|
||||
# clickable links to docs that ACTUALLY exist
|
||||
assert "<a href=" in html
|
||||
base = ("https://git.mva154.duckdns.org/admin/enduro-trails/src/branch/"
|
||||
"feature/ET-011-gpx-upload-feature/docs/work-items/ET-011/")
|
||||
assert base + "01-brd.md" in html
|
||||
assert base + "04-test-plan.yaml" in html
|
||||
# the missing file is NOT invented
|
||||
assert "04b-ui-test-cases.md" not in html
|
||||
# internal git url must NOT appear in clickable links
|
||||
assert "localhost:3000" not in html
|
||||
|
||||
|
||||
def test_analyst_comment_falls_back_to_gitea_url(monkeypatch, tmp_path):
|
||||
"""When gitea_public_url is empty, links fall back to gitea_url."""
|
||||
from src import stage_engine as SE
|
||||
|
||||
wt = tmp_path / "wt"
|
||||
docs = wt / "docs" / "work-items" / "ET-011"
|
||||
docs.mkdir(parents=True)
|
||||
(docs / "01-brd.md").write_text("x")
|
||||
|
||||
monkeypatch.setattr(SE, "get_worktree_path", lambda repo, branch: str(wt))
|
||||
monkeypatch.setattr(SE.settings, "gitea_url", "http://localhost:3000")
|
||||
monkeypatch.setattr(SE.settings, "gitea_public_url", "")
|
||||
monkeypatch.setattr(SE.settings, "gitea_owner", "admin")
|
||||
|
||||
html = SE._build_analyst_ready_comment(
|
||||
"enduro-trails", "ET-011", "feature/ET-011-gpx-upload-feature"
|
||||
)
|
||||
|
||||
base = ("http://localhost:3000/admin/enduro-trails/src/branch/"
|
||||
"feature/ET-011-gpx-upload-feature/docs/work-items/ET-011/")
|
||||
assert base + "01-brd.md" in html
|
||||
@@ -102,16 +102,22 @@ def test_fetch_sequence_id_missing_field_returns_none():
|
||||
# handle_work_item_created: seq available -> prefix-NNN
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
# Feature 1: pipeline starts on a status change to In Progress, not on creation.
|
||||
_IN_PROGRESS = "b873d9eb-993c-48cd-97ac-99a9b1623967"
|
||||
|
||||
|
||||
def _post(plane_id, plane_project_id=ORCH_PLANE_ID, name="A valid work item title"):
|
||||
return client.post(
|
||||
"/webhook/plane",
|
||||
json={
|
||||
"event": "work_item.created",
|
||||
"event": "issue",
|
||||
"action": "updated",
|
||||
"data": {
|
||||
"id": plane_id,
|
||||
"name": name,
|
||||
"description_stripped": "This is a sufficiently long description.",
|
||||
"project": plane_project_id,
|
||||
"state": {"id": _IN_PROGRESS, "name": "In Progress", "group": "started"},
|
||||
},
|
||||
},
|
||||
)
|
||||
|
||||
213
tests/test_pipeline_start_bugs.py
Normal file
213
tests/test_pipeline_start_bugs.py
Normal file
@@ -0,0 +1,213 @@
|
||||
"""Tests for the two pipeline-start bugs surfaced by the ET-006 live run.
|
||||
|
||||
BUG 1: issue.updated (status -> In Progress) ships a payload WITHOUT the
|
||||
description, so start_pipeline must pull it from the Plane issue API
|
||||
before QG-0 runs (otherwise QG-0 wrongly blocks the issue).
|
||||
|
||||
BUG 2a: M-6 derives work_item_id from the Plane sequence_id, which can collide.
|
||||
ensure_unique_work_item_id() must hand out the next FREE id instead of
|
||||
reusing one that is already in the tasks table.
|
||||
|
||||
BUG 2b: two tasks with an (artificially) identical work_item_id must not share a
|
||||
branch/worktree.
|
||||
|
||||
launcher / Gitea / Plane network are mocked. Real FastAPI endpoint via
|
||||
TestClient for the BUG 1 end-to-end path.
|
||||
"""
|
||||
|
||||
import os
|
||||
import tempfile
|
||||
|
||||
_test_db = os.path.join(tempfile.gettempdir(), "test_orchestrator_pipeline_bugs.db")
|
||||
os.environ["ORCH_DB_PATH"] = _test_db
|
||||
os.environ.setdefault("ORCH_PLANE_WEBHOOK_SECRET", "")
|
||||
os.environ.setdefault("ORCH_GITEA_TOKEN", "test-token")
|
||||
os.environ.setdefault("ORCH_PLANE_API_TOKEN", "test-token")
|
||||
|
||||
import pytest # noqa: E402
|
||||
from unittest.mock import patch, AsyncMock # noqa: E402
|
||||
from fastapi.testclient import TestClient # noqa: E402
|
||||
|
||||
from src.main import app # noqa: E402
|
||||
from src.db import init_db, get_db, ensure_unique_work_item_id # noqa: E402
|
||||
from src import projects as P # noqa: E402
|
||||
from src.projects import reload_projects # noqa: E402
|
||||
from src.git_worktree import get_worktree_path # noqa: E402
|
||||
|
||||
ENDURO_PLANE_ID = "7a79f0a9-5278-49cd-9007-9a338f238f9c"
|
||||
IN_PROGRESS = "b873d9eb-993c-48cd-97ac-99a9b1623967"
|
||||
BACKLOG = "113b24f6-cce8-4be9-9a22-a359b9cf0122"
|
||||
|
||||
client = TestClient(app)
|
||||
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def setup(monkeypatch):
|
||||
monkeypatch.setattr(P.settings, "db_path", _test_db)
|
||||
import src.db as _db
|
||||
monkeypatch.setattr(_db.settings, "db_path", _test_db)
|
||||
if os.path.exists(_test_db):
|
||||
os.unlink(_test_db)
|
||||
init_db()
|
||||
monkeypatch.setattr("src.webhooks.plane.verify_plane_signature", lambda body, sig: True)
|
||||
registry_json = (
|
||||
f'[{{"plane_project_id": "{ENDURO_PLANE_ID}", "repo": "enduro-trails",'
|
||||
f' "work_item_prefix": "ET", "name": "enduro-trails"}}]'
|
||||
)
|
||||
monkeypatch.setattr(P.settings, "projects_json", registry_json)
|
||||
reload_projects()
|
||||
yield
|
||||
reload_projects()
|
||||
if os.path.exists(_test_db):
|
||||
os.unlink(_test_db)
|
||||
|
||||
|
||||
def _insert_task(work_item_id, branch, plane_id="x"):
|
||||
conn = get_db()
|
||||
conn.execute(
|
||||
"INSERT INTO tasks (plane_id, work_item_id, repo, branch, stage, plane_issue_id) "
|
||||
"VALUES (?, ?, ?, ?, ?, ?)",
|
||||
(plane_id, work_item_id, "enduro-trails", branch, "analysis", plane_id),
|
||||
)
|
||||
conn.commit()
|
||||
conn.close()
|
||||
|
||||
|
||||
def _count(plane_id):
|
||||
conn = get_db()
|
||||
n = conn.execute("SELECT COUNT(*) FROM tasks WHERE plane_id=?", (plane_id,)).fetchone()[0]
|
||||
conn.close()
|
||||
return n
|
||||
|
||||
|
||||
def _task(plane_id):
|
||||
conn = get_db()
|
||||
row = conn.execute("SELECT * FROM tasks WHERE plane_id=?", (plane_id,)).fetchone()
|
||||
conn.close()
|
||||
return row
|
||||
|
||||
|
||||
# --------------------------------------------------------------------------- #
|
||||
# BUG 1
|
||||
# --------------------------------------------------------------------------- #
|
||||
def _to_in_progress_no_desc(plane_id="bug1"):
|
||||
"""issue.updated payload WITHOUT description (only changed fields)."""
|
||||
return client.post("/webhook/plane", json={
|
||||
"event": "issue", "action": "updated",
|
||||
"data": {
|
||||
"id": plane_id, "name": "A valid backlog item title",
|
||||
# NO description / description_stripped here, exactly like Plane sends
|
||||
# on a status change.
|
||||
"project": ENDURO_PLANE_ID,
|
||||
"state": {"id": IN_PROGRESS, "name": "In Progress", "group": "started"},
|
||||
},
|
||||
"activity": {"field": "state", "new_value": IN_PROGRESS, "old_value": BACKLOG},
|
||||
})
|
||||
|
||||
|
||||
@patch("src.webhooks.plane.enqueue_job", return_value=1)
|
||||
@patch("src.webhooks.plane._create_initial_docs", new_callable=AsyncMock)
|
||||
@patch("src.webhooks.plane._create_gitea_branch", new_callable=AsyncMock)
|
||||
@patch("src.plane_sync.fetch_issue_sequence_id", return_value=42)
|
||||
@patch("src.plane_sync.fetch_issue_fields",
|
||||
return_value=("A valid backlog item title",
|
||||
"This is a sufficiently long description fetched from Plane API."))
|
||||
def test_status_start_fetches_description(
|
||||
mock_fields, mock_seq, mock_branch, mock_docs, mock_enqueue
|
||||
):
|
||||
"""BUG 1: empty description in payload -> start_pipeline pulls it from the
|
||||
Plane API (single fetch_issue_fields GET) -> QG-0 passes -> task created +
|
||||
analyst enqueued (NOT blocked)."""
|
||||
resp = _to_in_progress_no_desc("bug1")
|
||||
assert resp.status_code == 200
|
||||
# name + description were pulled from the API in one call
|
||||
mock_fields.assert_called_once()
|
||||
# QG-0 passed -> task created and analyst launched (NOT set_issue_blocked)
|
||||
assert _count("bug1") == 1
|
||||
assert _task("bug1")["stage"] == "analysis"
|
||||
mock_enqueue.assert_called_once()
|
||||
assert mock_enqueue.call_args.args[0] == "analyst"
|
||||
|
||||
|
||||
@patch("src.webhooks.plane.enqueue_job", return_value=1)
|
||||
@patch("src.webhooks.plane._create_initial_docs", new_callable=AsyncMock)
|
||||
@patch("src.webhooks.plane._create_gitea_branch", new_callable=AsyncMock)
|
||||
@patch("src.plane_sync.fetch_issue_sequence_id", return_value=42)
|
||||
@patch("src.plane_sync.fetch_issue_fields", return_value=("", ""))
|
||||
def test_status_start_empty_api_still_blocks(
|
||||
mock_fields, mock_seq, mock_branch, mock_docs, mock_enqueue
|
||||
):
|
||||
"""BUG 1 negative path: if the API also returns empty, QG-0 legitimately
|
||||
fails -> NO task is created (truly empty ticket)."""
|
||||
resp = _to_in_progress_no_desc("bug1-empty")
|
||||
assert resp.status_code == 200
|
||||
mock_fields.assert_called_once()
|
||||
assert _count("bug1-empty") == 0
|
||||
mock_enqueue.assert_not_called()
|
||||
|
||||
|
||||
# --------------------------------------------------------------------------- #
|
||||
# BUG 2a
|
||||
# --------------------------------------------------------------------------- #
|
||||
def test_work_item_id_uniqueness():
|
||||
"""BUG 2a: if ET-006 is already in tasks, the guard returns the next free
|
||||
id (ET-007), not ET-006 again."""
|
||||
_insert_task("ET-006", "feature/ET-006-gpx-upload", plane_id="old")
|
||||
assert ensure_unique_work_item_id("ET-006", "enduro-trails") == "ET-007"
|
||||
|
||||
# ET-006 AND ET-007 taken -> next free is ET-008.
|
||||
_insert_task("ET-007", "feature/ET-007-something", plane_id="old2")
|
||||
assert ensure_unique_work_item_id("ET-006", "enduro-trails") == "ET-008"
|
||||
|
||||
# A free id is returned unchanged.
|
||||
assert ensure_unique_work_item_id("ET-099", "enduro-trails") == "ET-099"
|
||||
|
||||
# Per-repo isolation: a different repo with the same id is not a collision.
|
||||
assert ensure_unique_work_item_id("ET-006", "other-repo") == "ET-006"
|
||||
|
||||
|
||||
@patch("src.webhooks.plane.enqueue_job", return_value=1)
|
||||
@patch("src.webhooks.plane._create_initial_docs", new_callable=AsyncMock)
|
||||
@patch("src.webhooks.plane._create_gitea_branch", new_callable=AsyncMock)
|
||||
@patch("src.plane_sync.fetch_issue_sequence_id", return_value=6)
|
||||
@patch("src.plane_sync.fetch_issue_fields",
|
||||
return_value=("Popup enduro trails feature",
|
||||
"A sufficiently long description for QG-0 to pass cleanly."))
|
||||
def test_collision_reassigns_in_start_pipeline(
|
||||
mock_fields, mock_seq, mock_branch, mock_docs, mock_enqueue
|
||||
):
|
||||
"""BUG 2a end-to-end: ET-006 already exists -> a new In Progress issue whose
|
||||
Plane sequence_id is also 6 must NOT reuse ET-006."""
|
||||
_insert_task("ET-006", "feature/ET-006-gpx-upload", plane_id="task8")
|
||||
resp = client.post("/webhook/plane", json={
|
||||
"event": "issue", "action": "updated",
|
||||
"data": {
|
||||
"id": "task25", "name": "Popup enduro trails feature",
|
||||
"description_stripped": "A sufficiently long description for QG-0.",
|
||||
"project": ENDURO_PLANE_ID,
|
||||
"state": {"id": IN_PROGRESS, "name": "In Progress", "group": "started"},
|
||||
},
|
||||
"activity": {"field": "state", "new_value": IN_PROGRESS, "old_value": BACKLOG},
|
||||
})
|
||||
assert resp.status_code == 200
|
||||
new_id = _task("task25")["work_item_id"]
|
||||
assert new_id != "ET-006"
|
||||
assert new_id == "ET-007"
|
||||
|
||||
|
||||
# --------------------------------------------------------------------------- #
|
||||
# BUG 2b
|
||||
# --------------------------------------------------------------------------- #
|
||||
def test_worktree_per_task():
|
||||
"""BUG 2b: two tasks must not resolve to the same worktree path. With the
|
||||
uniqueness guard the branches differ, so the worktree paths differ too."""
|
||||
_insert_task("ET-006", "feature/ET-006-gpx-upload", plane_id="task8")
|
||||
# The second task gets a unique id via the guard...
|
||||
new_id = ensure_unique_work_item_id("ET-006", "enduro-trails")
|
||||
assert new_id == "ET-007"
|
||||
branch_a = "feature/ET-006-gpx-upload"
|
||||
branch_b = f"feature/{new_id}-popup-enduro-trails"
|
||||
|
||||
wt_a = get_worktree_path("enduro-trails", branch_a)
|
||||
wt_b = get_worktree_path("enduro-trails", branch_b)
|
||||
assert wt_a != wt_b, "two tasks must not share a worktree path"
|
||||
99
tests/test_plane_author.py
Normal file
99
tests/test_plane_author.py
Normal file
@@ -0,0 +1,99 @@
|
||||
"""Tests for per-agent Plane comment authorship (feat: per-agent bot author).
|
||||
|
||||
Covers:
|
||||
* _headers_for: role -> bot token; None/unknown/empty token -> shared fallback.
|
||||
* add_comment: author is propagated into the POST headers; no author keeps
|
||||
backward-compatible behaviour (shared orchestrator token).
|
||||
|
||||
GET/PATCH calls are intentionally NOT covered here: they stay on the shared
|
||||
token by design and are unchanged by this feature.
|
||||
"""
|
||||
|
||||
import os
|
||||
|
||||
# Set env defaults before importing app modules (same convention as the other
|
||||
# suites) so config/settings load cleanly without a real .env.
|
||||
os.environ.setdefault("ORCH_PLANE_API_TOKEN", "shared-token")
|
||||
os.environ.setdefault("ORCH_GITEA_TOKEN", "test-token")
|
||||
|
||||
from unittest.mock import patch, MagicMock # noqa: E402
|
||||
|
||||
from src import plane_sync # noqa: E402
|
||||
|
||||
|
||||
# --------------------------------------------------------------------------- #
|
||||
# _headers_for
|
||||
# --------------------------------------------------------------------------- #
|
||||
def test_headers_for_known_role_uses_bot_token():
|
||||
"""A known role with a configured token -> that bot's X-API-Key."""
|
||||
with patch.dict(plane_sync.PLANE_BOT_TOKENS, {"analyst": "analyst-tok"}, clear=False):
|
||||
assert plane_sync._headers_for("analyst") == {"X-API-Key": "analyst-tok"}
|
||||
|
||||
|
||||
def test_headers_for_none_falls_back_to_shared():
|
||||
"""author=None -> shared orchestrator headers."""
|
||||
assert plane_sync._headers_for(None) is plane_sync.PLANE_HEADERS
|
||||
|
||||
|
||||
def test_headers_for_unknown_role_falls_back_to_shared():
|
||||
"""Unknown role -> shared orchestrator headers."""
|
||||
assert plane_sync._headers_for("nope") is plane_sync.PLANE_HEADERS
|
||||
|
||||
|
||||
def test_headers_for_empty_token_falls_back_to_shared():
|
||||
"""Known role but empty/unconfigured token -> shared orchestrator headers."""
|
||||
with patch.dict(plane_sync.PLANE_BOT_TOKENS, {"tester": ""}, clear=False):
|
||||
assert plane_sync._headers_for("tester") is plane_sync.PLANE_HEADERS
|
||||
|
||||
|
||||
def test_headers_for_empty_string_author_falls_back_to_shared():
|
||||
"""author='' -> shared orchestrator headers."""
|
||||
assert plane_sync._headers_for("") is plane_sync.PLANE_HEADERS
|
||||
|
||||
|
||||
# --------------------------------------------------------------------------- #
|
||||
# add_comment
|
||||
# --------------------------------------------------------------------------- #
|
||||
def _mock_post_ok():
|
||||
resp = MagicMock()
|
||||
resp.raise_for_status.return_value = None
|
||||
return resp
|
||||
|
||||
|
||||
def test_add_comment_with_author_posts_with_bot_headers():
|
||||
"""add_comment(author='developer') -> httpx.post called with the developer
|
||||
bot's X-API-Key header."""
|
||||
with patch.object(plane_sync, "find_issue_id", return_value="issue-uuid"), \
|
||||
patch.object(plane_sync, "_resolve_project_id", return_value="proj-uuid"), \
|
||||
patch.dict(plane_sync.PLANE_BOT_TOKENS, {"developer": "dev-tok"}, clear=False), \
|
||||
patch.object(plane_sync.httpx, "post", return_value=_mock_post_ok()) as mock_post:
|
||||
plane_sync.add_comment("ET-001", "hello", author="developer")
|
||||
|
||||
assert mock_post.called
|
||||
_, kwargs = mock_post.call_args
|
||||
assert kwargs["headers"] == {"X-API-Key": "dev-tok"}
|
||||
|
||||
|
||||
def test_add_comment_without_author_uses_shared_token():
|
||||
"""add_comment without author -> shared orchestrator headers (backward
|
||||
compatible)."""
|
||||
with patch.object(plane_sync, "find_issue_id", return_value="issue-uuid"), \
|
||||
patch.object(plane_sync, "_resolve_project_id", return_value="proj-uuid"), \
|
||||
patch.object(plane_sync.httpx, "post", return_value=_mock_post_ok()) as mock_post:
|
||||
plane_sync.add_comment("ET-001", "hello")
|
||||
|
||||
assert mock_post.called
|
||||
_, kwargs = mock_post.call_args
|
||||
assert kwargs["headers"] is plane_sync.PLANE_HEADERS
|
||||
|
||||
|
||||
def test_add_comment_unknown_author_uses_shared_token():
|
||||
"""add_comment with an unknown role -> shared orchestrator headers."""
|
||||
with patch.object(plane_sync, "find_issue_id", return_value="issue-uuid"), \
|
||||
patch.object(plane_sync, "_resolve_project_id", return_value="proj-uuid"), \
|
||||
patch.object(plane_sync.httpx, "post", return_value=_mock_post_ok()) as mock_post:
|
||||
plane_sync.add_comment("ET-001", "hello", author="ghost")
|
||||
|
||||
assert mock_post.called
|
||||
_, kwargs = mock_post.call_args
|
||||
assert kwargs["headers"] is plane_sync.PLANE_HEADERS
|
||||
@@ -73,16 +73,24 @@ def setup(monkeypatch):
|
||||
os.unlink(_test_db)
|
||||
|
||||
|
||||
# Feature 1: the pipeline now starts on a status change to In Progress (not on
|
||||
# creation). _post_created drives that status-change event so these ORCH-6
|
||||
# routing tests still exercise task creation through the new trigger.
|
||||
_IN_PROGRESS = "b873d9eb-993c-48cd-97ac-99a9b1623967"
|
||||
|
||||
|
||||
def _post_created(plane_project_id, plane_id="wi-1", name="A valid work item title"):
|
||||
return client.post(
|
||||
"/webhook/plane",
|
||||
json={
|
||||
"event": "work_item.created",
|
||||
"event": "issue",
|
||||
"action": "updated",
|
||||
"data": {
|
||||
"id": plane_id,
|
||||
"name": name,
|
||||
"description_stripped": "This is a sufficiently long description.",
|
||||
"project": plane_project_id,
|
||||
"state": {"id": _IN_PROGRESS, "name": "In Progress", "group": "started"},
|
||||
},
|
||||
},
|
||||
)
|
||||
|
||||
@@ -17,7 +17,9 @@ from src.qg.checks import (
|
||||
check_ci_green,
|
||||
check_review_approved,
|
||||
check_tests_passed,
|
||||
check_tests_local,
|
||||
)
|
||||
from src.stages import get_qg_for_stage
|
||||
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
@@ -186,3 +188,57 @@ class TestCheckTestsPassed:
|
||||
passed, reason = check_tests_passed("enduro-trails", "ET-001")
|
||||
assert passed is False
|
||||
assert "not found" in reason.lower()
|
||||
|
||||
|
||||
class TestDevelopmentStageQG:
|
||||
"""BUG 6: development stage QG is now check_ci_green (CI is the authoritative
|
||||
gate), not the deprecated check_tests_local."""
|
||||
|
||||
def test_development_qg_is_check_ci_green(self):
|
||||
assert get_qg_for_stage("development") == "check_ci_green"
|
||||
|
||||
def test_check_tests_local_is_deprecated_and_unwired(self):
|
||||
# Kept in the registry for backward-compat, but not wired to any stage.
|
||||
from src.qg.checks import QG_CHECKS
|
||||
from src.stages import STAGE_TRANSITIONS
|
||||
assert "check_tests_local" in QG_CHECKS
|
||||
wired = {t.get("qg") for t in STAGE_TRANSITIONS.values()}
|
||||
assert "check_tests_local" not in wired
|
||||
|
||||
|
||||
class TestCheckTestsLocal:
|
||||
"""BUG 5: check_tests_local must run pytest directly (not make, which is
|
||||
not installed in the orchestrator container)."""
|
||||
|
||||
@patch("src.qg.checks.ensure_worktree")
|
||||
@patch("subprocess.run")
|
||||
def test_passes_on_returncode_zero(self, mock_run, mock_wt, tmp_path):
|
||||
mock_wt.return_value = str(tmp_path)
|
||||
mock_run.return_value = MagicMock(returncode=0, stdout="ok", stderr="")
|
||||
passed, reason = check_tests_local("enduro-trails", "feature/ET-001-x")
|
||||
assert passed is True
|
||||
assert reason == "Local tests passed"
|
||||
|
||||
@patch("src.qg.checks.ensure_worktree")
|
||||
@patch("subprocess.run")
|
||||
def test_fails_on_nonzero_returncode(self, mock_run, mock_wt, tmp_path):
|
||||
mock_wt.return_value = str(tmp_path)
|
||||
mock_run.return_value = MagicMock(returncode=1, stdout="boom", stderr="trace")
|
||||
passed, reason = check_tests_local("enduro-trails", "feature/ET-001-x")
|
||||
assert passed is False
|
||||
assert "Local tests failed" in reason
|
||||
|
||||
@patch("src.qg.checks.ensure_worktree")
|
||||
@patch("subprocess.run")
|
||||
def test_invokes_pytest_not_make(self, mock_run, mock_wt, tmp_path):
|
||||
"""The subprocess call must be pytest, from src/api, against ../../tests/."""
|
||||
mock_wt.return_value = str(tmp_path)
|
||||
mock_run.return_value = MagicMock(returncode=0, stdout="", stderr="")
|
||||
check_tests_local("enduro-trails", "feature/ET-001-x")
|
||||
args, kwargs = mock_run.call_args
|
||||
cmd = args[0]
|
||||
assert "make" not in cmd
|
||||
assert cmd[:3] == ["python", "-m", "pytest"]
|
||||
assert "../../tests/" in cmd
|
||||
assert kwargs["cwd"] == os.path.join(str(tmp_path), "src", "api")
|
||||
|
||||
|
||||
@@ -203,10 +203,13 @@ class TestQgFailureDoesNotAdvance:
|
||||
assert _jobs() == []
|
||||
|
||||
def test_webhook_path_emits_qg_failure_notification(self, monkeypatch):
|
||||
"""finished_agent=None -> generic QG-failure notification fires (plane parity)."""
|
||||
"""finished_agent=None -> generic QG-failure notification fires (plane parity).
|
||||
|
||||
development stage QG is now check_ci_green (was check_tests_local).
|
||||
"""
|
||||
monkeypatch.setattr(
|
||||
stage_engine, "QG_CHECKS",
|
||||
{**stage_engine.QG_CHECKS, "check_tests_local": _fail("ci red")},
|
||||
{**stage_engine.QG_CHECKS, "check_ci_green": _fail("ci red")},
|
||||
)
|
||||
task_id = _make_task("development")
|
||||
advance_stage(task_id, "development", "enduro-trails", "ET-001",
|
||||
@@ -358,6 +361,63 @@ class TestAnalysisApprovedFlow:
|
||||
assert stage_engine.notify_approve_requested.called
|
||||
assert _jobs() == []
|
||||
|
||||
def test_approved_verdict_advances_analysis_to_architecture(self, monkeypatch):
|
||||
"""BUG 4: a human Approved STATUS (webhook path, finished_agent=None)
|
||||
must satisfy the analysis gate and advance analysis -> architecture,
|
||||
enqueuing the architect. The status-only approval must NOT re-run
|
||||
check_analysis_approved (which looks for an :approved: COMMENT and would
|
||||
otherwise wrongly block the advance).
|
||||
"""
|
||||
# Make check_analysis_approved FAIL if it is ever called: the webhook
|
||||
# path must bypass it entirely (status == approval). If the engine were
|
||||
# to re-run the gate, this would block the advance and fail the test.
|
||||
monkeypatch.setattr(
|
||||
stage_engine, "QG_CHECKS",
|
||||
{
|
||||
**stage_engine.QG_CHECKS,
|
||||
"check_analysis_approved": _fail("no :approved: comment"),
|
||||
},
|
||||
)
|
||||
# Guard: the approval-flow (launcher-only) must NOT be invoked here.
|
||||
flow = MagicMock()
|
||||
monkeypatch.setattr(stage_engine, "_handle_analysis_approved_flow", flow)
|
||||
|
||||
task_id = _make_task("analysis")
|
||||
res = advance_stage(
|
||||
task_id, "analysis", "enduro-trails", "ET-001",
|
||||
"feature/ET-001-x", finished_agent=None,
|
||||
)
|
||||
|
||||
assert res.advanced is True
|
||||
assert res.to_stage == "architecture"
|
||||
assert _stage(task_id) == "architecture"
|
||||
assert res.enqueued_agent == "architect"
|
||||
# Sanity: agent for analysis is architect, never analyst (no re-run loop).
|
||||
assert get_agent_for_stage("analysis") == "architect"
|
||||
jobs = _jobs()
|
||||
assert len(jobs) == 1
|
||||
assert jobs[0]["agent"] == "architect"
|
||||
# The launcher-only approval-flow was NOT called on the webhook path.
|
||||
flow.assert_not_called()
|
||||
|
||||
def test_launcher_path_does_not_advance_and_calls_flow(self, monkeypatch):
|
||||
"""Regression: the launcher path (finished_agent='analyst') still routes
|
||||
into _handle_analysis_approved_flow and does NOT advance.
|
||||
"""
|
||||
flow = MagicMock()
|
||||
monkeypatch.setattr(stage_engine, "_handle_analysis_approved_flow", flow)
|
||||
|
||||
task_id = _make_task("analysis")
|
||||
res = advance_stage(
|
||||
task_id, "analysis", "enduro-trails", "ET-001",
|
||||
"feature/ET-001-x", finished_agent="analyst",
|
||||
)
|
||||
|
||||
assert res.advanced is not True
|
||||
assert _stage(task_id) == "analysis"
|
||||
assert _jobs() == []
|
||||
flow.assert_called_once()
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# launcher + plane both delegate to the engine
|
||||
|
||||
94
tests/test_stage_visibility.py
Normal file
94
tests/test_stage_visibility.py
Normal file
@@ -0,0 +1,94 @@
|
||||
"""Feature 3: stage visibility on the Plane board.
|
||||
|
||||
* PLANE_STATES carries the 6 new per-stage / verdict UUIDs.
|
||||
* STAGE_TO_STATE maps architecture/development/review/testing to their
|
||||
dedicated board statuses (not all -> In Progress anymore).
|
||||
* set_issue_stage_state(work_item_id, stage) PATCHes the correct state UUID
|
||||
for a visible stage, and is a no-op for stages without one (analysis/deploy).
|
||||
* Needs Input / In Review / Blocked remain higher priority: their explicit
|
||||
setters use their own state, never overwritten by the stage map.
|
||||
|
||||
httpx is mocked; no network.
|
||||
"""
|
||||
|
||||
import os
|
||||
|
||||
os.environ.setdefault("ORCH_PLANE_API_TOKEN", "test-token")
|
||||
os.environ.setdefault("ORCH_GITEA_TOKEN", "test-token")
|
||||
|
||||
from unittest.mock import patch, MagicMock # noqa: E402
|
||||
|
||||
from src import plane_sync as PS # noqa: E402
|
||||
|
||||
|
||||
EXPECTED_UUIDS = {
|
||||
"architecture": "3020bbb7-6122-4663-930c-0315ba8dfa3d",
|
||||
"development": "9920609b-f140-4e46-ab95-89acda8412c8",
|
||||
"review": "ba0d802c-5218-41d4-ab43-978b0ea123ed",
|
||||
"testing": "7855d807-b1bf-42ef-8dae-6cde0df92d02",
|
||||
"approved": "a519a341-dada-4a91-8910-7604f82b79c5",
|
||||
"rejected": "ba958f3c-5db5-461d-8f82-89425e413b97",
|
||||
}
|
||||
|
||||
|
||||
def test_plane_states_has_new_uuids():
|
||||
for key, uuid in EXPECTED_UUIDS.items():
|
||||
assert PS.PLANE_STATES[key] == uuid
|
||||
|
||||
|
||||
def test_stage_to_state_maps_visible_stages():
|
||||
assert PS.STAGE_TO_STATE["architecture"] == EXPECTED_UUIDS["architecture"]
|
||||
assert PS.STAGE_TO_STATE["development"] == EXPECTED_UUIDS["development"]
|
||||
assert PS.STAGE_TO_STATE["review"] == EXPECTED_UUIDS["review"]
|
||||
assert PS.STAGE_TO_STATE["testing"] == EXPECTED_UUIDS["testing"]
|
||||
# analysis / deploy stay on In Progress; done stays Done.
|
||||
assert PS.STAGE_TO_STATE["analysis"] == PS.PLANE_STATES["in_progress"]
|
||||
assert PS.STAGE_TO_STATE["deploy"] == PS.PLANE_STATES["in_progress"]
|
||||
assert PS.STAGE_TO_STATE["done"] == PS.PLANE_STATES["done"]
|
||||
|
||||
|
||||
def _patch_resolution(monkey_targets):
|
||||
"""Helper: patch find_issue_id + _resolve_project_id to skip the DB/network."""
|
||||
return monkey_targets
|
||||
|
||||
|
||||
@patch("src.plane_sync.httpx.patch")
|
||||
@patch("src.plane_sync.find_issue_id", return_value="issue-uuid")
|
||||
@patch("src.plane_sync._resolve_project_id", return_value="proj-1")
|
||||
def test_set_issue_stage_state_patches_correct_uuid(mock_proj, mock_find, mock_patch):
|
||||
resp = MagicMock(); resp.raise_for_status.return_value = None
|
||||
mock_patch.return_value = resp
|
||||
|
||||
PS.set_issue_stage_state("ET-1", "development")
|
||||
# the PATCH carried the development state UUID
|
||||
_, kwargs = mock_patch.call_args
|
||||
assert kwargs["json"]["state"] == EXPECTED_UUIDS["development"]
|
||||
|
||||
|
||||
@patch("src.plane_sync.httpx.patch")
|
||||
@patch("src.plane_sync.find_issue_id", return_value="issue-uuid")
|
||||
@patch("src.plane_sync._resolve_project_id", return_value="proj-1")
|
||||
def test_set_issue_stage_state_noop_for_analysis(mock_proj, mock_find, mock_patch):
|
||||
# analysis has no dedicated board status -> no PATCH at all.
|
||||
PS.set_issue_stage_state("ET-1", "analysis")
|
||||
mock_patch.assert_not_called()
|
||||
PS.set_issue_stage_state("ET-1", "deploy")
|
||||
mock_patch.assert_not_called()
|
||||
|
||||
|
||||
@patch("src.plane_sync.httpx.patch")
|
||||
@patch("src.plane_sync.find_issue_id", return_value="issue-uuid")
|
||||
@patch("src.plane_sync._resolve_project_id", return_value="proj-1")
|
||||
def test_priority_states_use_their_own_uuid(mock_proj, mock_find, mock_patch):
|
||||
"""Needs Input / In Review / Blocked are set explicitly and take priority."""
|
||||
resp = MagicMock(); resp.raise_for_status.return_value = None
|
||||
mock_patch.return_value = resp
|
||||
|
||||
PS.set_issue_needs_input("ET-1")
|
||||
assert mock_patch.call_args.kwargs["json"]["state"] == PS.PLANE_STATES["needs_input"]
|
||||
|
||||
PS.set_issue_in_review("ET-1")
|
||||
assert mock_patch.call_args.kwargs["json"]["state"] == PS.PLANE_STATES["in_review"]
|
||||
|
||||
PS.set_issue_blocked("ET-1")
|
||||
assert mock_patch.call_args.kwargs["json"]["state"] == PS.PLANE_STATES["blocked"]
|
||||
200
tests/test_status_only_verdict.py
Normal file
200
tests/test_status_only_verdict.py
Normal file
@@ -0,0 +1,200 @@
|
||||
"""Status-only verdict model (bug 3 fix).
|
||||
|
||||
The comment-based control mechanism (:approved: / :rejected: / answer-to-questions)
|
||||
was removed. The pipeline is driven SOLELY by Plane status changes. These tests
|
||||
lock in the new behaviour:
|
||||
|
||||
* test_inreview_comment_does_not_revert — bug 3 root: an In Review task,
|
||||
any comment arrives -> status NOT reverted, no agent launched.
|
||||
* test_any_comment_no_pipeline_action — :approved: / :rejected: / plain
|
||||
text comment -> no status change, no enqueue.
|
||||
* test_approved_status_advances_without_inprogress_reset — Approved status
|
||||
advances WITHOUT an intermediate set_issue_in_progress reset.
|
||||
* test_rejected_status_pulls_reason_from_comment — Rejected status pulls the
|
||||
reason from the issue's latest comment (mocked GET comments).
|
||||
"""
|
||||
|
||||
import os
|
||||
import tempfile
|
||||
|
||||
_test_db = os.path.join(tempfile.gettempdir(), "test_orchestrator_status_only.db")
|
||||
os.environ["ORCH_DB_PATH"] = _test_db
|
||||
os.environ.setdefault("ORCH_PLANE_WEBHOOK_SECRET", "")
|
||||
os.environ.setdefault("ORCH_GITEA_TOKEN", "test-token")
|
||||
os.environ.setdefault("ORCH_PLANE_API_TOKEN", "test-token")
|
||||
|
||||
import pytest # noqa: E402
|
||||
from unittest.mock import patch, AsyncMock # noqa: E402
|
||||
from fastapi.testclient import TestClient # noqa: E402
|
||||
|
||||
from src.main import app # noqa: E402
|
||||
from src.db import init_db, get_db # noqa: E402
|
||||
from src import projects as P # noqa: E402
|
||||
from src.projects import reload_projects # noqa: E402
|
||||
|
||||
ENDURO_PLANE_ID = "7a79f0a9-5278-49cd-9007-9a338f238f9c"
|
||||
APPROVED = "a519a341-dada-4a91-8910-7604f82b79c5"
|
||||
REJECTED = "ba958f3c-5db5-461d-8f82-89425e413b97"
|
||||
IN_REVIEW = "38fb1f64-aa1e-48a3-92e0-0b109679046b"
|
||||
|
||||
client = TestClient(app)
|
||||
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def setup(monkeypatch):
|
||||
monkeypatch.setattr(P.settings, "db_path", _test_db)
|
||||
import src.db as _db
|
||||
monkeypatch.setattr(_db.settings, "db_path", _test_db)
|
||||
if os.path.exists(_test_db):
|
||||
os.unlink(_test_db)
|
||||
init_db()
|
||||
monkeypatch.setattr("src.webhooks.plane.verify_plane_signature", lambda body, sig: True)
|
||||
registry_json = (
|
||||
f'[{{"plane_project_id": "{ENDURO_PLANE_ID}", "repo": "enduro-trails",'
|
||||
f' "work_item_prefix": "ET", "name": "enduro-trails"}}]'
|
||||
)
|
||||
monkeypatch.setattr(P.settings, "projects_json", registry_json)
|
||||
reload_projects()
|
||||
# Seed a task at the 'review' stage for plane_id 'r-1'.
|
||||
conn = get_db()
|
||||
conn.execute(
|
||||
"INSERT INTO tasks (plane_id, work_item_id, repo, branch, stage, plane_issue_id) "
|
||||
"VALUES (?, ?, ?, ?, ?, ?)",
|
||||
("r-1", "ET-700", "enduro-trails", "feature/ET-700-x", "review", "r-1"),
|
||||
)
|
||||
conn.commit()
|
||||
conn.close()
|
||||
yield
|
||||
reload_projects()
|
||||
if os.path.exists(_test_db):
|
||||
os.unlink(_test_db)
|
||||
|
||||
|
||||
class _FakeResp:
|
||||
def __init__(self, status_code, payload):
|
||||
self.status_code = status_code
|
||||
self._payload = payload
|
||||
|
||||
def json(self):
|
||||
return self._payload
|
||||
|
||||
|
||||
def _comment(text, plane_id="r-1"):
|
||||
return client.post("/webhook/plane", json={
|
||||
"event": "issue_comment", "action": "created",
|
||||
"data": {"work_item_id": plane_id, "comment_stripped": text,
|
||||
"project": ENDURO_PLANE_ID},
|
||||
})
|
||||
|
||||
|
||||
def _status(state_id, plane_id="r-1", old="prev"):
|
||||
return client.post("/webhook/plane", json={
|
||||
"event": "issue", "action": "updated",
|
||||
"data": {
|
||||
"id": plane_id, "name": "Status task", "project": ENDURO_PLANE_ID,
|
||||
"state": {"id": state_id, "name": "X", "group": "started"},
|
||||
},
|
||||
"activity": {"field": "state", "new_value": state_id, "old_value": old},
|
||||
})
|
||||
|
||||
|
||||
def _stage(plane_id="r-1"):
|
||||
conn = get_db()
|
||||
row = conn.execute("SELECT stage FROM tasks WHERE plane_id=?", (plane_id,)).fetchone()
|
||||
conn.close()
|
||||
return row[0] if row else None
|
||||
|
||||
|
||||
# --------------------------------------------------------------------------- #
|
||||
# Bug 3 root: In Review must not revert on a comment.
|
||||
# --------------------------------------------------------------------------- #
|
||||
@patch("src.webhooks.plane.enqueue_job")
|
||||
@patch("src.plane_sync.set_issue_in_progress")
|
||||
@patch("src.plane_sync._set_issue_state_direct")
|
||||
@patch("src.plane_sync.update_issue_state")
|
||||
def test_inreview_comment_does_not_revert(
|
||||
mock_update_state, mock_set_direct, mock_sip, mock_enqueue
|
||||
):
|
||||
"""Bug 3: task in In Review, ANY comment arrives -> status NOT reverted to
|
||||
In Progress, NO agent launched. The analyst's own 'waiting for approval'
|
||||
comment used to echo back and self-hit -> reverted In Review -> In Progress.
|
||||
"""
|
||||
# analyst's own echo comment
|
||||
resp = _comment("Готово, жду approved")
|
||||
assert resp.status_code == 200
|
||||
# no status changes whatsoever
|
||||
mock_sip.assert_not_called()
|
||||
mock_set_direct.assert_not_called()
|
||||
mock_update_state.assert_not_called()
|
||||
# no agent launched
|
||||
mock_enqueue.assert_not_called()
|
||||
# stage untouched
|
||||
assert _stage() == "review"
|
||||
|
||||
|
||||
# --------------------------------------------------------------------------- #
|
||||
# Any comment -> zero pipeline side-effects.
|
||||
# --------------------------------------------------------------------------- #
|
||||
@pytest.mark.parametrize("text", [":approved:", ":rejected: bad", "plain text", ""])
|
||||
@patch("src.webhooks.plane.enqueue_job")
|
||||
@patch("src.webhooks.plane._try_advance_stage", new_callable=AsyncMock)
|
||||
@patch("src.webhooks.plane._rollback_stage", new_callable=AsyncMock)
|
||||
@patch("src.plane_sync.set_issue_in_progress")
|
||||
@patch("src.plane_sync._set_issue_state_direct")
|
||||
def test_any_comment_no_pipeline_action(
|
||||
mock_set_direct, mock_sip, mock_rollback, mock_advance, mock_enqueue, text
|
||||
):
|
||||
resp = _comment(text)
|
||||
assert resp.status_code == 200
|
||||
mock_advance.assert_not_called()
|
||||
mock_rollback.assert_not_called()
|
||||
mock_sip.assert_not_called()
|
||||
mock_set_direct.assert_not_called()
|
||||
mock_enqueue.assert_not_called()
|
||||
assert _stage() == "review"
|
||||
|
||||
|
||||
# --------------------------------------------------------------------------- #
|
||||
# Approved status advances WITHOUT in_progress reset.
|
||||
# --------------------------------------------------------------------------- #
|
||||
@patch("src.plane_sync.set_issue_in_progress")
|
||||
@patch("src.webhooks.plane._try_advance_stage", new_callable=AsyncMock)
|
||||
def test_approved_status_advances_without_inprogress_reset(mock_advance, mock_sip):
|
||||
resp = _status(APPROVED)
|
||||
assert resp.status_code == 200
|
||||
mock_advance.assert_awaited_once()
|
||||
# work_item_id passed positionally
|
||||
assert "ET-700" in mock_advance.call_args.args
|
||||
# bug 3 (cause B): NO intermediate set_issue_in_progress before advance.
|
||||
mock_sip.assert_not_called()
|
||||
|
||||
|
||||
# --------------------------------------------------------------------------- #
|
||||
# Rejected status pulls reason from latest comment.
|
||||
# --------------------------------------------------------------------------- #
|
||||
@patch("src.webhooks.plane.httpx.get")
|
||||
@patch("src.webhooks.plane._rollback_stage", new_callable=AsyncMock)
|
||||
def test_rejected_status_pulls_reason_from_comment(mock_rollback, mock_get):
|
||||
mock_get.return_value = _FakeResp(200, {"results": [
|
||||
{"comment_stripped": "old comment", "created_at": "2026-06-03T09:00:00Z"},
|
||||
{"comment_html": "<p>Needs more test coverage</p>",
|
||||
"created_at": "2026-06-03T11:30:00Z"},
|
||||
]})
|
||||
resp = _status(REJECTED)
|
||||
assert resp.status_code == 200
|
||||
mock_rollback.assert_awaited_once()
|
||||
reason = mock_rollback.call_args.args[-1]
|
||||
# latest by created_at, HTML stripped
|
||||
assert "Needs more test coverage" in reason
|
||||
assert "<p>" not in reason
|
||||
|
||||
|
||||
@patch("src.webhooks.plane.httpx.get")
|
||||
@patch("src.webhooks.plane._rollback_stage", new_callable=AsyncMock)
|
||||
def test_rejected_status_no_comment_uses_fallback(mock_rollback, mock_get):
|
||||
mock_get.return_value = _FakeResp(200, {"results": []})
|
||||
resp = _status(REJECTED)
|
||||
assert resp.status_code == 200
|
||||
mock_rollback.assert_awaited_once()
|
||||
reason = mock_rollback.call_args.args[-1]
|
||||
assert "no reason comment" in reason
|
||||
243
tests/test_status_trigger.py
Normal file
243
tests/test_status_trigger.py
Normal file
@@ -0,0 +1,243 @@
|
||||
"""Feature 1: pipeline starts on status -> In Progress, not on creation.
|
||||
|
||||
* work_item.created / issue created -> NO task, NO branch, NO analyst.
|
||||
* issue updated -> In Progress (from backlog) -> task created + analyst enqueued.
|
||||
* a second In Progress update while the agent is busy -> NO duplicate, NO
|
||||
restart (busy-guard).
|
||||
* In Progress returned from Needs Input (agent idle) -> agent RELAUNCHED.
|
||||
|
||||
launcher / Gitea network are mocked. Real FastAPI endpoint via TestClient.
|
||||
"""
|
||||
|
||||
import os
|
||||
import tempfile
|
||||
|
||||
_test_db = os.path.join(tempfile.gettempdir(), "test_orchestrator_status_trigger.db")
|
||||
os.environ["ORCH_DB_PATH"] = _test_db
|
||||
os.environ.setdefault("ORCH_PLANE_WEBHOOK_SECRET", "")
|
||||
os.environ.setdefault("ORCH_GITEA_TOKEN", "test-token")
|
||||
os.environ.setdefault("ORCH_PLANE_API_TOKEN", "test-token")
|
||||
|
||||
import pytest # noqa: E402
|
||||
from unittest.mock import patch, AsyncMock # noqa: E402
|
||||
from fastapi.testclient import TestClient # noqa: E402
|
||||
|
||||
from src.main import app # noqa: E402
|
||||
from src.db import init_db, get_db # noqa: E402
|
||||
from src import projects as P # noqa: E402
|
||||
from src.projects import reload_projects # noqa: E402
|
||||
|
||||
ENDURO_PLANE_ID = "7a79f0a9-5278-49cd-9007-9a338f238f9c"
|
||||
IN_PROGRESS = "b873d9eb-993c-48cd-97ac-99a9b1623967"
|
||||
BACKLOG = "113b24f6-cce8-4be9-9a22-a359b9cf0122"
|
||||
|
||||
client = TestClient(app)
|
||||
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def setup(monkeypatch):
|
||||
monkeypatch.setattr(P.settings, "db_path", _test_db)
|
||||
import src.db as _db
|
||||
monkeypatch.setattr(_db.settings, "db_path", _test_db)
|
||||
if os.path.exists(_test_db):
|
||||
os.unlink(_test_db)
|
||||
init_db()
|
||||
monkeypatch.setattr("src.webhooks.plane.verify_plane_signature", lambda body, sig: True)
|
||||
registry_json = (
|
||||
f'[{{"plane_project_id": "{ENDURO_PLANE_ID}", "repo": "enduro-trails",'
|
||||
f' "work_item_prefix": "ET", "name": "enduro-trails"}}]'
|
||||
)
|
||||
monkeypatch.setattr(P.settings, "projects_json", registry_json)
|
||||
reload_projects()
|
||||
yield
|
||||
reload_projects()
|
||||
if os.path.exists(_test_db):
|
||||
os.unlink(_test_db)
|
||||
|
||||
|
||||
def _created(plane_id="st-created"):
|
||||
return client.post("/webhook/plane", json={
|
||||
"event": "issue", "action": "created",
|
||||
"data": {
|
||||
"id": plane_id, "name": "A valid backlog item title",
|
||||
"description_stripped": "A sufficiently long description for QG-0.",
|
||||
"project": ENDURO_PLANE_ID,
|
||||
"state": {"id": BACKLOG, "name": "Backlog", "group": "backlog"},
|
||||
},
|
||||
})
|
||||
|
||||
|
||||
def _to_in_progress(plane_id="st-1"):
|
||||
return client.post("/webhook/plane", json={
|
||||
"event": "issue", "action": "updated",
|
||||
"data": {
|
||||
"id": plane_id, "name": "A valid backlog item title",
|
||||
"description_stripped": "A sufficiently long description for QG-0.",
|
||||
"project": ENDURO_PLANE_ID,
|
||||
"state": {"id": IN_PROGRESS, "name": "In Progress", "group": "started"},
|
||||
},
|
||||
"activity": {"field": "state", "new_value": IN_PROGRESS, "old_value": BACKLOG},
|
||||
})
|
||||
|
||||
|
||||
def _count(plane_id):
|
||||
conn = get_db()
|
||||
n = conn.execute("SELECT COUNT(*) FROM tasks WHERE plane_id=?", (plane_id,)).fetchone()[0]
|
||||
conn.close()
|
||||
return n
|
||||
|
||||
|
||||
# --------------------------------------------------------------------------- #
|
||||
@patch("src.webhooks.plane.enqueue_job")
|
||||
@patch("src.webhooks.plane._create_initial_docs", new_callable=AsyncMock)
|
||||
@patch("src.webhooks.plane._create_gitea_branch", new_callable=AsyncMock)
|
||||
def test_created_does_not_start_pipeline(mock_branch, mock_docs, mock_enqueue):
|
||||
resp = _created("st-created")
|
||||
assert resp.status_code == 200
|
||||
assert resp.json()["status"] == "accepted"
|
||||
# No task, no branch, no analyst enqueue.
|
||||
assert _count("st-created") == 0
|
||||
mock_branch.assert_not_called()
|
||||
mock_enqueue.assert_not_called()
|
||||
|
||||
|
||||
@patch("src.webhooks.plane.enqueue_job")
|
||||
@patch("src.webhooks.plane._create_initial_docs", new_callable=AsyncMock)
|
||||
@patch("src.webhooks.plane._create_gitea_branch", new_callable=AsyncMock)
|
||||
@patch("src.plane_sync.fetch_issue_sequence_id", return_value=5)
|
||||
def test_in_progress_starts_pipeline(mock_seq, mock_branch, mock_docs, mock_enqueue):
|
||||
mock_enqueue.return_value = 1
|
||||
resp = _to_in_progress("st-1")
|
||||
assert resp.status_code == 200
|
||||
assert resp.json()["status"] == "accepted"
|
||||
assert _count("st-1") == 1
|
||||
conn = get_db()
|
||||
task = conn.execute("SELECT * FROM tasks WHERE plane_id='st-1'").fetchone()
|
||||
conn.close()
|
||||
assert task["stage"] == "analysis"
|
||||
assert task["repo"] == "enduro-trails"
|
||||
mock_branch.assert_called_once()
|
||||
# analyst enqueued exactly once
|
||||
assert mock_enqueue.call_count == 1
|
||||
assert mock_enqueue.call_args.args[0] == "analyst"
|
||||
|
||||
|
||||
@patch("src.webhooks.plane.enqueue_job")
|
||||
@patch("src.webhooks.plane._create_initial_docs", new_callable=AsyncMock)
|
||||
@patch("src.webhooks.plane._create_gitea_branch", new_callable=AsyncMock)
|
||||
@patch("src.plane_sync.fetch_issue_sequence_id", return_value=5)
|
||||
def test_repeat_in_progress_while_job_active_does_not_relaunch(
|
||||
mock_seq, mock_branch, mock_docs, mock_enqueue
|
||||
):
|
||||
"""Status-only model busy-guard: a duplicate In Progress webhook that arrives
|
||||
while the stage agent still has a queued/running job must NOT relaunch the
|
||||
agent (no double launch).
|
||||
"""
|
||||
mock_enqueue.return_value = 1
|
||||
_to_in_progress("st-2")
|
||||
assert _count("st-2") == 1
|
||||
assert mock_enqueue.call_count == 1
|
||||
|
||||
# enqueue_job is mocked above, so no real job row exists. Seed an ACTIVE
|
||||
# (queued) job for the task so has_active_job_for_task() reports the agent as
|
||||
# busy -> the busy-guard fires.
|
||||
conn = get_db()
|
||||
task_id = conn.execute(
|
||||
"SELECT id FROM tasks WHERE plane_id='st-2'"
|
||||
).fetchone()[0]
|
||||
conn.execute(
|
||||
"INSERT INTO jobs (agent, repo, task_id, status) VALUES (?, ?, ?, 'queued')",
|
||||
("analyst", "enduro-trails", task_id),
|
||||
)
|
||||
conn.commit()
|
||||
conn.close()
|
||||
|
||||
# Second In Progress update. DISTINCT body (different activity old_value) so
|
||||
# webhook dedup does NOT short-circuit it — this exercises the busy-guard in
|
||||
# handle_status_start, not the delivery-dedup layer.
|
||||
resp = client.post("/webhook/plane", json={
|
||||
"event": "issue", "action": "updated",
|
||||
"data": {
|
||||
"id": "st-2", "name": "A valid backlog item title",
|
||||
"description_stripped": "A sufficiently long description for QG-0.",
|
||||
"project": ENDURO_PLANE_ID,
|
||||
"state": {"id": IN_PROGRESS, "name": "In Progress", "group": "started"},
|
||||
},
|
||||
"activity": {"field": "state", "new_value": IN_PROGRESS, "old_value": "some-other-state"},
|
||||
})
|
||||
assert resp.status_code == 200
|
||||
assert _count("st-2") == 1 # still exactly one task
|
||||
assert mock_enqueue.call_count == 1 # analyst NOT re-enqueued (busy-guard)
|
||||
|
||||
|
||||
@patch("src.webhooks.plane.add_comment", create=True)
|
||||
@patch("src.webhooks.plane.enqueue_job")
|
||||
@patch("src.webhooks.plane._create_initial_docs", new_callable=AsyncMock)
|
||||
@patch("src.webhooks.plane._create_gitea_branch", new_callable=AsyncMock)
|
||||
@patch("src.plane_sync.fetch_issue_sequence_id", return_value=5)
|
||||
def test_inprogress_from_needs_input_relaunches_analyst(
|
||||
mock_seq, mock_branch, mock_docs, mock_enqueue, mock_comment
|
||||
):
|
||||
"""Status-only answer-to-questions flow: an existing analysis task whose agent
|
||||
is IDLE (no active job — it went to Needs Input) is returned to In Progress
|
||||
-> the analyst is relaunched to read Slava's fresh comments.
|
||||
|
||||
+ double-webhook protection: a second In Progress while the relaunch job is
|
||||
active does NOT relaunch again.
|
||||
"""
|
||||
mock_enqueue.return_value = 1
|
||||
# First In Progress: starts the pipeline (creates task + enqueues analyst).
|
||||
_to_in_progress("st-ni")
|
||||
assert _count("st-ni") == 1
|
||||
assert mock_enqueue.call_count == 1
|
||||
|
||||
# The analyst finished and asked questions -> Needs Input. In our model that
|
||||
# means NO active job for the task (enqueue_job is mocked, so no job row).
|
||||
conn = get_db()
|
||||
task_id = conn.execute(
|
||||
"SELECT id FROM tasks WHERE plane_id='st-ni'"
|
||||
).fetchone()[0]
|
||||
has_job = conn.execute(
|
||||
"SELECT COUNT(*) FROM jobs WHERE task_id=? AND status IN ('queued','running')",
|
||||
(task_id,),
|
||||
).fetchone()[0]
|
||||
conn.close()
|
||||
assert has_job == 0 # agent idle
|
||||
|
||||
# Slava answers + returns the issue to In Progress (distinct body).
|
||||
resp = client.post("/webhook/plane", json={
|
||||
"event": "issue", "action": "updated",
|
||||
"data": {
|
||||
"id": "st-ni", "name": "A valid backlog item title",
|
||||
"description_stripped": "A sufficiently long description for QG-0.",
|
||||
"project": ENDURO_PLANE_ID,
|
||||
"state": {"id": IN_PROGRESS, "name": "In Progress", "group": "started"},
|
||||
},
|
||||
"activity": {"field": "state", "new_value": IN_PROGRESS, "old_value": "needs-input"},
|
||||
})
|
||||
assert resp.status_code == 200
|
||||
assert _count("st-ni") == 1 # no duplicate task
|
||||
assert mock_enqueue.call_count == 2 # analyst RELAUNCHED
|
||||
assert mock_enqueue.call_args.args[0] == "analyst"
|
||||
|
||||
# Seed an active job for the relaunch, then a SECOND In Progress webhook must
|
||||
# NOT relaunch again (busy-guard against double webhooks).
|
||||
conn = get_db()
|
||||
conn.execute(
|
||||
"INSERT INTO jobs (agent, repo, task_id, status) VALUES (?, ?, ?, 'running')",
|
||||
("analyst", "enduro-trails", task_id),
|
||||
)
|
||||
conn.commit()
|
||||
conn.close()
|
||||
resp2 = client.post("/webhook/plane", json={
|
||||
"event": "issue", "action": "updated",
|
||||
"data": {
|
||||
"id": "st-ni", "name": "A valid backlog item title",
|
||||
"description_stripped": "A sufficiently long description for QG-0.",
|
||||
"project": ENDURO_PLANE_ID,
|
||||
"state": {"id": IN_PROGRESS, "name": "In Progress", "group": "started"},
|
||||
},
|
||||
"activity": {"field": "state", "new_value": IN_PROGRESS, "old_value": "x-y-z"},
|
||||
})
|
||||
assert resp2.status_code == 200
|
||||
assert mock_enqueue.call_count == 2 # still 2 — busy-guard held
|
||||
138
tests/test_taskmd_description.py
Normal file
138
tests/test_taskmd_description.py
Normal file
@@ -0,0 +1,138 @@
|
||||
"""Tests for fix/taskmd-description (3 bugs at the analyst pipeline entry/exit):
|
||||
|
||||
BUG A: start_pipeline built the analyst .task.md WITHOUT the description body
|
||||
(only Title), so analyst received a ~101-byte file and reported the
|
||||
"business request is empty". task_desc must now carry the description.
|
||||
|
||||
BUG B: issue.updated ships only changed fields, so `name` is usually absent ->
|
||||
slug/branch became "untitled". start_pipeline must pull the real name
|
||||
from the Plane API (single fetch_issue_fields GET, above the slug build)
|
||||
so the branch slug is NOT "untitled".
|
||||
|
||||
BUG C: the analyst "artifacts ready" comment used the obsolete ":approved:"
|
||||
wording. Under the status-only model it must ask for the **Approved**
|
||||
status (not ":approved:", not "In Progress") and link the docs that
|
||||
actually exist.
|
||||
"""
|
||||
|
||||
import os
|
||||
import tempfile
|
||||
|
||||
_test_db = os.path.join(tempfile.gettempdir(), "test_orchestrator_taskmd_desc.db")
|
||||
os.environ["ORCH_DB_PATH"] = _test_db
|
||||
os.environ.setdefault("ORCH_PLANE_WEBHOOK_SECRET", "")
|
||||
os.environ.setdefault("ORCH_GITEA_TOKEN", "test-token")
|
||||
os.environ.setdefault("ORCH_PLANE_API_TOKEN", "test-token")
|
||||
|
||||
import pytest # noqa: E402
|
||||
from unittest.mock import patch, AsyncMock # noqa: E402
|
||||
from fastapi.testclient import TestClient # noqa: E402
|
||||
|
||||
from src.main import app # noqa: E402
|
||||
from src.db import init_db, get_db # noqa: E402
|
||||
from src import projects as P # noqa: E402
|
||||
from src.projects import reload_projects # noqa: E402
|
||||
|
||||
ENDURO_PLANE_ID = "7a79f0a9-5278-49cd-9007-9a338f238f9c"
|
||||
IN_PROGRESS = "b873d9eb-993c-48cd-97ac-99a9b1623967"
|
||||
BACKLOG = "113b24f6-cce8-4be9-9a22-a359b9cf0122"
|
||||
|
||||
client = TestClient(app)
|
||||
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def setup(monkeypatch):
|
||||
monkeypatch.setattr(P.settings, "db_path", _test_db)
|
||||
import src.db as _db
|
||||
monkeypatch.setattr(_db.settings, "db_path", _test_db)
|
||||
if os.path.exists(_test_db):
|
||||
os.unlink(_test_db)
|
||||
init_db()
|
||||
monkeypatch.setattr("src.webhooks.plane.verify_plane_signature", lambda body, sig: True)
|
||||
registry_json = (
|
||||
f'[{{"plane_project_id": "{ENDURO_PLANE_ID}", "repo": "enduro-trails",'
|
||||
f' "work_item_prefix": "ET", "name": "enduro-trails"}}]'
|
||||
)
|
||||
monkeypatch.setattr(P.settings, "projects_json", registry_json)
|
||||
reload_projects()
|
||||
yield
|
||||
reload_projects()
|
||||
if os.path.exists(_test_db):
|
||||
os.unlink(_test_db)
|
||||
|
||||
|
||||
def _task(plane_id):
|
||||
conn = get_db()
|
||||
row = conn.execute("SELECT * FROM tasks WHERE plane_id=?", (plane_id,)).fetchone()
|
||||
conn.close()
|
||||
return row
|
||||
|
||||
|
||||
# --------------------------------------------------------------------------- #
|
||||
# BUG A: description reaches the analyst .task.md
|
||||
# --------------------------------------------------------------------------- #
|
||||
@patch("src.webhooks.plane.enqueue_job", return_value=1)
|
||||
@patch("src.webhooks.plane._create_initial_docs", new_callable=AsyncMock)
|
||||
@patch("src.webhooks.plane._create_gitea_branch", new_callable=AsyncMock)
|
||||
@patch("src.plane_sync.fetch_issue_sequence_id", return_value=11)
|
||||
@patch("src.plane_sync.fetch_issue_fields",
|
||||
return_value=("ET-011 real title",
|
||||
"REAL BUSINESS REQUEST BODY: user wants GPX upload with "
|
||||
"validation and a results map."))
|
||||
def test_taskdesc_includes_description(
|
||||
mock_fields, mock_seq, mock_branch, mock_docs, mock_enqueue
|
||||
):
|
||||
resp = client.post("/webhook/plane", json={
|
||||
"event": "issue", "action": "updated",
|
||||
"data": {
|
||||
"id": "taskA",
|
||||
# status change payload: NO name, NO description (only changed field)
|
||||
"project": ENDURO_PLANE_ID,
|
||||
"state": {"id": IN_PROGRESS, "name": "In Progress", "group": "started"},
|
||||
},
|
||||
"activity": {"field": "state", "new_value": IN_PROGRESS, "old_value": BACKLOG},
|
||||
})
|
||||
assert resp.status_code == 200
|
||||
mock_enqueue.assert_called_once()
|
||||
# task_desc is the 3rd positional arg of enqueue_job(agent, repo, task_desc, ...)
|
||||
task_desc = mock_enqueue.call_args.args[2]
|
||||
assert "Description:" in task_desc
|
||||
# the actual description body (not just the Title) is in the file
|
||||
assert "REAL BUSINESS REQUEST BODY" in task_desc
|
||||
assert "results map" in task_desc
|
||||
|
||||
|
||||
# --------------------------------------------------------------------------- #
|
||||
# BUG B: name fetched from Plane API when payload is empty -> slug not untitled
|
||||
# --------------------------------------------------------------------------- #
|
||||
@patch("src.webhooks.plane.enqueue_job", return_value=1)
|
||||
@patch("src.webhooks.plane._create_initial_docs", new_callable=AsyncMock)
|
||||
@patch("src.webhooks.plane._create_gitea_branch", new_callable=AsyncMock)
|
||||
@patch("src.plane_sync.fetch_issue_sequence_id", return_value=11)
|
||||
@patch("src.plane_sync.fetch_issue_fields",
|
||||
return_value=("GPX upload feature",
|
||||
"A sufficiently long description so QG-0 passes cleanly."))
|
||||
def test_name_fetched_when_payload_empty(
|
||||
mock_fields, mock_seq, mock_branch, mock_docs, mock_enqueue
|
||||
):
|
||||
resp = client.post("/webhook/plane", json={
|
||||
"event": "issue", "action": "updated",
|
||||
"data": {
|
||||
"id": "taskB",
|
||||
# NO name, NO description in the payload (Plane status-change shape)
|
||||
"project": ENDURO_PLANE_ID,
|
||||
"state": {"id": IN_PROGRESS, "name": "In Progress", "group": "started"},
|
||||
},
|
||||
"activity": {"field": "state", "new_value": IN_PROGRESS, "old_value": BACKLOG},
|
||||
})
|
||||
assert resp.status_code == 200
|
||||
mock_fields.assert_called_once()
|
||||
row = _task("taskB")
|
||||
assert row is not None
|
||||
branch = row["branch"]
|
||||
# slug derived from the fetched name -> "gpx-upload-feature", NOT untitled
|
||||
assert "untitled" not in branch
|
||||
assert "gpx-upload-feature" in branch
|
||||
# Title in the analyst task file is the fetched name, not "untitled"
|
||||
task_desc = mock_enqueue.call_args.args[2]
|
||||
assert "Title: GPX upload feature" in task_desc
|
||||
176
tests/test_usage.py
Normal file
176
tests/test_usage.py
Normal file
@@ -0,0 +1,176 @@
|
||||
"""Feature 4: token / cost accounting tests.
|
||||
|
||||
Covers:
|
||||
* parse_usage_from_text on a REAL claude --output-format json result blob
|
||||
(captured live from CLI 2.1.142), including a leading text line.
|
||||
* parse on garbage / missing JSON -> None (never raises).
|
||||
* record_usage writes the columns; NULLs when usage is None.
|
||||
* fmt_tokens / fmt_cost formatting.
|
||||
* usage_comment string format.
|
||||
* task_usage_summary / task_summary_comment aggregate over agent_runs.
|
||||
|
||||
DB is an isolated temp file; no network or subprocess.
|
||||
"""
|
||||
|
||||
import os
|
||||
import tempfile
|
||||
|
||||
os.environ.setdefault("ORCH_PLANE_API_TOKEN", "test-token")
|
||||
os.environ.setdefault("ORCH_GITEA_TOKEN", "test-token")
|
||||
|
||||
_test_db = os.path.join(tempfile.gettempdir(), "test_orchestrator_usage.db")
|
||||
os.environ["ORCH_DB_PATH"] = _test_db
|
||||
|
||||
import pytest # noqa: E402
|
||||
|
||||
from src import db as db_module # noqa: E402
|
||||
from src.db import init_db, get_db # noqa: E402
|
||||
from src import usage as U # noqa: E402
|
||||
|
||||
|
||||
# Real claude --output-format json result object (captured from CLI 2.1.142).
|
||||
REAL_RESULT_JSON = (
|
||||
'{"type":"result","subtype":"success","is_error":false,"duration_ms":1795,'
|
||||
'"num_turns":1,"result":"Hi!","session_id":"abc",'
|
||||
'"total_cost_usd":0.0560175,'
|
||||
'"usage":{"input_tokens":45231,"cache_creation_input_tokens":7418,'
|
||||
'"cache_read_input_tokens":18500,"output_tokens":12100,'
|
||||
'"service_tier":"standard"},'
|
||||
'"modelUsage":{"claude-opus-4-7":{"inputTokens":6,"outputTokens":7}},'
|
||||
'"permission_denials":[]}'
|
||||
)
|
||||
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def setup_db(monkeypatch):
|
||||
# get_db() reads settings.db_path live; pin it to our isolated DB.
|
||||
monkeypatch.setattr(db_module.settings, "db_path", _test_db, raising=False)
|
||||
if os.path.exists(_test_db):
|
||||
os.unlink(_test_db)
|
||||
init_db()
|
||||
yield
|
||||
if os.path.exists(_test_db):
|
||||
os.unlink(_test_db)
|
||||
|
||||
|
||||
# --------------------------------------------------------------------------- #
|
||||
# parsing
|
||||
# --------------------------------------------------------------------------- #
|
||||
def test_parse_real_result_json():
|
||||
u = U.parse_usage_from_text(REAL_RESULT_JSON)
|
||||
assert u is not None
|
||||
assert u["input_tokens"] == 45231
|
||||
assert u["output_tokens"] == 12100
|
||||
assert u["cache_read_tokens"] == 18500
|
||||
assert abs(u["cost_usd"] - 0.0560175) < 1e-9
|
||||
|
||||
|
||||
def test_parse_with_leading_text():
|
||||
"""The agent may print text before the trailing JSON; we still find it."""
|
||||
text = "some agent stdout line\nanother line\n" + REAL_RESULT_JSON
|
||||
u = U.parse_usage_from_text(text)
|
||||
assert u is not None
|
||||
assert u["input_tokens"] == 45231
|
||||
assert u["output_tokens"] == 12100
|
||||
|
||||
|
||||
def test_parse_garbage_returns_none():
|
||||
assert U.parse_usage_from_text("not json at all { broken") is None
|
||||
assert U.parse_usage_from_text("") is None
|
||||
assert U.parse_usage_from_text(None) is None
|
||||
|
||||
|
||||
def test_parse_json_without_usage_returns_none():
|
||||
assert U.parse_usage_from_text('{"hello":"world"}') is None
|
||||
|
||||
|
||||
def test_parse_from_log_missing_file_returns_none():
|
||||
assert U.parse_usage_from_log("/no/such/file.log") is None
|
||||
|
||||
|
||||
# --------------------------------------------------------------------------- #
|
||||
# record_usage
|
||||
# --------------------------------------------------------------------------- #
|
||||
def _new_run(agent="developer", task_id=1):
|
||||
conn = get_db()
|
||||
cur = conn.execute("INSERT INTO agent_runs (task_id, agent) VALUES (?, ?)", (task_id, agent))
|
||||
rid = cur.lastrowid
|
||||
conn.commit()
|
||||
conn.close()
|
||||
return rid
|
||||
|
||||
|
||||
def test_record_usage_writes_columns():
|
||||
rid = _new_run()
|
||||
u = U.parse_usage_from_text(REAL_RESULT_JSON)
|
||||
U.record_usage(rid, u)
|
||||
conn = get_db()
|
||||
row = conn.execute(
|
||||
"SELECT input_tokens, output_tokens, cache_read_tokens, cost_usd "
|
||||
"FROM agent_runs WHERE id=?", (rid,)
|
||||
).fetchone()
|
||||
conn.close()
|
||||
assert row["input_tokens"] == 45231
|
||||
assert row["output_tokens"] == 12100
|
||||
assert row["cache_read_tokens"] == 18500
|
||||
assert abs(row["cost_usd"] - 0.0560175) < 1e-9
|
||||
|
||||
|
||||
def test_record_usage_none_writes_nulls():
|
||||
rid = _new_run()
|
||||
U.record_usage(rid, None) # must not raise
|
||||
conn = get_db()
|
||||
row = conn.execute("SELECT input_tokens, cost_usd FROM agent_runs WHERE id=?", (rid,)).fetchone()
|
||||
conn.close()
|
||||
assert row["input_tokens"] is None
|
||||
assert row["cost_usd"] is None
|
||||
|
||||
|
||||
# --------------------------------------------------------------------------- #
|
||||
# formatting
|
||||
# --------------------------------------------------------------------------- #
|
||||
def test_fmt_tokens():
|
||||
assert U.fmt_tokens(6) == "6"
|
||||
assert U.fmt_tokens(1234) == "1.2k"
|
||||
assert U.fmt_tokens(45231) == "45.2k"
|
||||
assert U.fmt_tokens(2_500_000) == "2.5M"
|
||||
assert U.fmt_tokens(None) == "0"
|
||||
|
||||
|
||||
def test_fmt_cost():
|
||||
assert U.fmt_cost(0.21) == "$0.21"
|
||||
assert U.fmt_cost(0.0560175) == "$0.06"
|
||||
assert U.fmt_cost(None) == "$0.00"
|
||||
|
||||
|
||||
def test_usage_comment_format():
|
||||
u = {"input_tokens": 45231, "output_tokens": 12100, "cost_usd": 0.21}
|
||||
c = U.usage_comment("developer", u)
|
||||
assert "Developer" in c
|
||||
assert "45.2k in" in c
|
||||
assert "12.1k out" in c
|
||||
assert "$0.21" in c
|
||||
|
||||
|
||||
# --------------------------------------------------------------------------- #
|
||||
# task summary
|
||||
# --------------------------------------------------------------------------- #
|
||||
def test_task_summary_aggregates_over_agents():
|
||||
# two runs for the same task: developer + tester
|
||||
for agent, ti, to, cost in [("developer", 1000, 200, 0.10), ("tester", 500, 100, 0.05)]:
|
||||
rid = _new_run(agent=agent, task_id=42)
|
||||
U.record_usage(rid, {"input_tokens": ti, "output_tokens": to,
|
||||
"cache_read_tokens": 0, "cost_usd": cost})
|
||||
|
||||
s = U.task_usage_summary(42)
|
||||
assert s["total_in"] == 1500
|
||||
assert s["total_out"] == 300
|
||||
assert abs(s["total_cost"] - 0.15) < 1e-9
|
||||
agents = {a for a, *_ in s["per_agent"]}
|
||||
assert agents == {"developer", "tester"}
|
||||
|
||||
comment = U.task_summary_comment(42)
|
||||
assert "1.5k" in comment # total in
|
||||
assert "$0.15" in comment # total cost
|
||||
assert "Developer" in comment
|
||||
assert "Tester" in comment
|
||||
171
tests/test_verdict_status.py
Normal file
171
tests/test_verdict_status.py
Normal file
@@ -0,0 +1,171 @@
|
||||
"""Status-only verdict model: verdict statuses Approved / Rejected.
|
||||
|
||||
* issue updated -> Approved : calls _try_advance_stage, with NO intermediate
|
||||
set_issue_in_progress reset (bug 3 fix).
|
||||
* issue updated -> Rejected : calls _rollback_stage, with the reason pulled
|
||||
from the issue's latest comment.
|
||||
* COMMENTS NEVER trigger the pipeline: a :approved: / :rejected: comment is a
|
||||
pure no-op (the comment-based control mechanism was removed).
|
||||
|
||||
We mock the shared engine entry points (_try_advance_stage / _rollback_stage)
|
||||
and assert they fire ONLY for the status trigger, never for a comment.
|
||||
"""
|
||||
|
||||
import os
|
||||
import tempfile
|
||||
|
||||
_test_db = os.path.join(tempfile.gettempdir(), "test_orchestrator_verdict.db")
|
||||
os.environ["ORCH_DB_PATH"] = _test_db
|
||||
os.environ.setdefault("ORCH_PLANE_WEBHOOK_SECRET", "")
|
||||
os.environ.setdefault("ORCH_GITEA_TOKEN", "test-token")
|
||||
os.environ.setdefault("ORCH_PLANE_API_TOKEN", "test-token")
|
||||
|
||||
import pytest # noqa: E402
|
||||
from unittest.mock import patch, AsyncMock # noqa: E402
|
||||
from fastapi.testclient import TestClient # noqa: E402
|
||||
|
||||
from src.main import app # noqa: E402
|
||||
from src.db import init_db, get_db # noqa: E402
|
||||
from src import projects as P # noqa: E402
|
||||
from src.projects import reload_projects # noqa: E402
|
||||
|
||||
ENDURO_PLANE_ID = "7a79f0a9-5278-49cd-9007-9a338f238f9c"
|
||||
APPROVED = "a519a341-dada-4a91-8910-7604f82b79c5"
|
||||
REJECTED = "ba958f3c-5db5-461d-8f82-89425e413b97"
|
||||
|
||||
client = TestClient(app)
|
||||
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def setup(monkeypatch):
|
||||
monkeypatch.setattr(P.settings, "db_path", _test_db)
|
||||
import src.db as _db
|
||||
monkeypatch.setattr(_db.settings, "db_path", _test_db)
|
||||
if os.path.exists(_test_db):
|
||||
os.unlink(_test_db)
|
||||
init_db()
|
||||
monkeypatch.setattr("src.webhooks.plane.verify_plane_signature", lambda body, sig: True)
|
||||
registry_json = (
|
||||
f'[{{"plane_project_id": "{ENDURO_PLANE_ID}", "repo": "enduro-trails",'
|
||||
f' "work_item_prefix": "ET", "name": "enduro-trails"}}]'
|
||||
)
|
||||
monkeypatch.setattr(P.settings, "projects_json", registry_json)
|
||||
reload_projects()
|
||||
# Seed a task at the 'review' stage for plane_id 'v-1'.
|
||||
conn = get_db()
|
||||
conn.execute(
|
||||
"INSERT INTO tasks (plane_id, work_item_id, repo, branch, stage, plane_issue_id) "
|
||||
"VALUES (?, ?, ?, ?, ?, ?)",
|
||||
("v-1", "ET-500", "enduro-trails", "feature/ET-500-x", "review", "v-1"),
|
||||
)
|
||||
conn.commit()
|
||||
conn.close()
|
||||
yield
|
||||
reload_projects()
|
||||
if os.path.exists(_test_db):
|
||||
os.unlink(_test_db)
|
||||
|
||||
|
||||
def _status(state_id, plane_id="v-1", old="prev"):
|
||||
return client.post("/webhook/plane", json={
|
||||
"event": "issue", "action": "updated",
|
||||
"data": {
|
||||
"id": plane_id, "name": "Verdict task", "project": ENDURO_PLANE_ID,
|
||||
"state": {"id": state_id, "name": "X", "group": "started"},
|
||||
},
|
||||
"activity": {"field": "state", "new_value": state_id, "old_value": old},
|
||||
})
|
||||
|
||||
|
||||
def _comment(text, plane_id="v-1"):
|
||||
return client.post("/webhook/plane", json={
|
||||
"event": "issue_comment", "action": "created",
|
||||
"data": {"work_item_id": plane_id, "comment_stripped": text,
|
||||
"project": ENDURO_PLANE_ID},
|
||||
})
|
||||
|
||||
|
||||
class _FakeResp:
|
||||
def __init__(self, status_code, payload):
|
||||
self.status_code = status_code
|
||||
self._payload = payload
|
||||
|
||||
def json(self):
|
||||
return self._payload
|
||||
|
||||
|
||||
def _comments_response(comments):
|
||||
return _FakeResp(200, {"results": comments})
|
||||
|
||||
|
||||
# --------------------------------------------------------------------------- #
|
||||
# Approved status -> advance (no in_progress reset)
|
||||
# --------------------------------------------------------------------------- #
|
||||
@patch("src.plane_sync.set_issue_in_progress")
|
||||
@patch("src.webhooks.plane._try_advance_stage", new_callable=AsyncMock)
|
||||
def test_approved_status_advances(mock_advance, mock_sip):
|
||||
resp = _status(APPROVED)
|
||||
assert resp.status_code == 200
|
||||
mock_advance.assert_awaited_once()
|
||||
# advanced the right task (ET-500 at review)
|
||||
args = mock_advance.call_args.args
|
||||
assert "ET-500" in args # work_item_id is passed positionally
|
||||
# bug 3 fix: handle_verdict no longer resets the status to In Progress.
|
||||
mock_sip.assert_not_called()
|
||||
|
||||
|
||||
@patch("src.plane_sync.set_issue_in_progress")
|
||||
@patch("src.webhooks.plane._rollback_stage", new_callable=AsyncMock)
|
||||
@patch("src.webhooks.plane._try_advance_stage", new_callable=AsyncMock)
|
||||
def test_approved_comment_is_noop(mock_advance, mock_rollback, mock_sip):
|
||||
"""Status-only model: a :approved: comment NEVER advances the pipeline."""
|
||||
resp = _comment(":approved:")
|
||||
assert resp.status_code == 200
|
||||
mock_advance.assert_not_called()
|
||||
mock_rollback.assert_not_called()
|
||||
mock_sip.assert_not_called()
|
||||
|
||||
|
||||
# --------------------------------------------------------------------------- #
|
||||
# Rejected status -> rollback (reason from latest comment)
|
||||
# --------------------------------------------------------------------------- #
|
||||
@patch("src.webhooks.plane.httpx.get")
|
||||
@patch("src.webhooks.plane._rollback_stage", new_callable=AsyncMock)
|
||||
def test_rejected_status_rolls_back(mock_rollback, mock_get):
|
||||
mock_get.return_value = _comments_response(
|
||||
[{"comment_stripped": "ADR missing tradeoffs",
|
||||
"created_at": "2026-06-03T10:00:00Z"}]
|
||||
)
|
||||
resp = _status(REJECTED)
|
||||
assert resp.status_code == 200
|
||||
mock_rollback.assert_awaited_once()
|
||||
# reason pulled from the latest comment
|
||||
reason = mock_rollback.call_args.args[-1]
|
||||
assert "ADR missing tradeoffs" in reason
|
||||
|
||||
|
||||
@patch("src.webhooks.plane.httpx.get")
|
||||
@patch("src.plane_sync.set_issue_in_progress")
|
||||
@patch("src.webhooks.plane._rollback_stage", new_callable=AsyncMock)
|
||||
@patch("src.webhooks.plane._try_advance_stage", new_callable=AsyncMock)
|
||||
def test_rejected_comment_is_noop(mock_advance, mock_rollback, mock_sip, mock_get):
|
||||
"""Status-only model: a :rejected: comment NEVER rolls back the pipeline."""
|
||||
resp = _comment(":rejected: bad ADR")
|
||||
assert resp.status_code == 200
|
||||
mock_advance.assert_not_called()
|
||||
mock_rollback.assert_not_called()
|
||||
mock_sip.assert_not_called()
|
||||
mock_get.assert_not_called()
|
||||
|
||||
|
||||
# --------------------------------------------------------------------------- #
|
||||
# Unknown verdict status -> no-op
|
||||
# --------------------------------------------------------------------------- #
|
||||
@patch("src.webhooks.plane._rollback_stage", new_callable=AsyncMock)
|
||||
@patch("src.webhooks.plane._try_advance_stage", new_callable=AsyncMock)
|
||||
def test_other_status_no_verdict_action(mock_advance, mock_rollback):
|
||||
# In Review status is not a verdict -> neither advance nor rollback.
|
||||
resp = _status("38fb1f64-aa1e-48a3-92e0-0b109679046b") # in_review
|
||||
assert resp.status_code == 200
|
||||
mock_advance.assert_not_called()
|
||||
mock_rollback.assert_not_called()
|
||||
@@ -211,14 +211,21 @@ def test_gitea_fallback_hash_when_no_delivery_header():
|
||||
@patch("src.webhooks.plane._create_gitea_branch", new_callable=AsyncMock)
|
||||
@patch("src.webhooks.plane._create_initial_docs", new_callable=AsyncMock)
|
||||
def test_plane_fallback_hash_dedup(mock_docs, mock_branch, mock_enqueue):
|
||||
"""Repeated identical Plane body -> first accepted+enqueue, repeat duplicate."""
|
||||
"""Repeated identical Plane body -> first accepted+enqueue, repeat duplicate.
|
||||
|
||||
Feature 1: the pipeline now starts on a status change to In Progress, not on
|
||||
creation, so this drives the dedup test with an 'issue updated' event.
|
||||
"""
|
||||
IN_PROGRESS = "b873d9eb-993c-48cd-97ac-99a9b1623967"
|
||||
body = {
|
||||
"event": "work_item.created",
|
||||
"event": "issue",
|
||||
"action": "updated",
|
||||
"data": {
|
||||
"id": "pd-001",
|
||||
"name": "Dedup plane task",
|
||||
"description_stripped": "A sufficiently long description for QG-0 to pass.",
|
||||
"project": "proj-1",
|
||||
"state": {"id": IN_PROGRESS, "name": "In Progress", "group": "started"},
|
||||
},
|
||||
}
|
||||
r1 = client.post("/webhook/plane", json=body)
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
import pytest
|
||||
import asyncio
|
||||
import os
|
||||
import tempfile
|
||||
from unittest.mock import patch, MagicMock, AsyncMock
|
||||
@@ -95,27 +96,32 @@ def test_plane_webhook_generates_sequential_ids(mock_docs, mock_branch):
|
||||
assert ids[1] == "ET-002"
|
||||
|
||||
|
||||
APPROVED_STATE = "a519a341-dada-4a91-8910-7604f82b79c5"
|
||||
REJECTED_STATE = "ba958f3c-5db5-461d-8f82-89425e413b97"
|
||||
|
||||
|
||||
@patch("src.webhooks.plane._create_gitea_branch", new_callable=AsyncMock)
|
||||
@patch("src.webhooks.plane._create_initial_docs", new_callable=AsyncMock)
|
||||
@patch("src.webhooks.plane.launcher")
|
||||
def test_plane_approved_advances_stage(mock_launcher, mock_docs, mock_branch, tmp_path, monkeypatch):
|
||||
"""Comment :approved: at stage=analysis → advance to architecture."""
|
||||
"""Status-only model: Approved STATUS at stage=analysis -> advance to
|
||||
architecture. A comment never triggers this.
|
||||
"""
|
||||
# Patch repos_dir for QG check
|
||||
monkeypatch.setattr("src.qg.checks.settings.repos_dir", str(tmp_path))
|
||||
|
||||
# Create task first
|
||||
client.post("/webhook/plane", json={
|
||||
"event": "work_item.created",
|
||||
"data": {"id": "adv-001", "name": "Advance test", "project": "proj-1"}
|
||||
})
|
||||
|
||||
# Get the task to find work_item_id
|
||||
# Seed an analysis task directly (creation no longer makes a task post-PR#11).
|
||||
conn = get_db()
|
||||
task = conn.execute("SELECT * FROM tasks WHERE plane_id = 'adv-001'").fetchone()
|
||||
conn.execute(
|
||||
"INSERT INTO tasks (plane_id, work_item_id, repo, branch, stage, plane_issue_id) "
|
||||
"VALUES (?, ?, ?, ?, ?, ?)",
|
||||
("adv-001", "ET-001", "enduro-trails", "feature/ET-001-x", "analysis", "adv-001"),
|
||||
)
|
||||
conn.commit()
|
||||
conn.close()
|
||||
work_item_id = task["work_item_id"]
|
||||
work_item_id = "ET-001"
|
||||
|
||||
# Create required analysis files
|
||||
# Create required analysis files so the analysis QG passes.
|
||||
wi_dir = tmp_path / "enduro-trails" / "docs" / "work-items" / work_item_id
|
||||
wi_dir.mkdir(parents=True)
|
||||
(wi_dir / "01-brd.md").write_text("# BRD")
|
||||
@@ -123,16 +129,15 @@ def test_plane_approved_advances_stage(mock_launcher, mock_docs, mock_branch, tm
|
||||
(wi_dir / "03-acceptance-criteria.md").write_text("# AC")
|
||||
(wi_dir / "04-test-plan.yaml").write_text("tests: []")
|
||||
|
||||
# Mock launcher
|
||||
mock_launcher.launch.return_value = 1
|
||||
|
||||
# Send approved comment
|
||||
# Send Approved STATUS change.
|
||||
resp = client.post("/webhook/plane", json={
|
||||
"event": "comment.created",
|
||||
"event": "issue", "action": "updated",
|
||||
"data": {
|
||||
"work_item_id": "adv-001",
|
||||
"comment": "Looks good :approved:"
|
||||
}
|
||||
"id": "adv-001", "name": "Advance test", "project": "proj-1",
|
||||
"state": {"id": APPROVED_STATE, "name": "Approved", "group": "completed"},
|
||||
},
|
||||
})
|
||||
assert resp.status_code == 200
|
||||
|
||||
@@ -143,29 +148,39 @@ def test_plane_approved_advances_stage(mock_launcher, mock_docs, mock_branch, tm
|
||||
assert task["stage"] == "architecture"
|
||||
|
||||
|
||||
@patch("src.webhooks.plane.httpx.get")
|
||||
@patch("src.webhooks.plane._create_gitea_branch", new_callable=AsyncMock)
|
||||
@patch("src.webhooks.plane._create_initial_docs", new_callable=AsyncMock)
|
||||
def test_plane_rejected_rolls_back(mock_docs, mock_branch):
|
||||
"""Comment :rejected: rolls back stage."""
|
||||
# Create task
|
||||
client.post("/webhook/plane", json={
|
||||
"event": "work_item.created",
|
||||
"data": {"id": "rej-001", "name": "Reject test", "project": "proj-1"}
|
||||
})
|
||||
def test_plane_rejected_rolls_back(mock_docs, mock_branch, mock_get):
|
||||
"""Status-only model: Rejected STATUS rolls back stage. A comment never
|
||||
triggers this; the reason is pulled from the latest comment.
|
||||
"""
|
||||
class _R:
|
||||
status_code = 200
|
||||
@staticmethod
|
||||
def json():
|
||||
return {"results": [
|
||||
{"comment_stripped": "missing ADR", "created_at": "2026-06-03T10:00:00Z"}
|
||||
]}
|
||||
mock_get.return_value = _R()
|
||||
|
||||
# Manually set stage to architecture
|
||||
# Seed an architecture task directly.
|
||||
conn = get_db()
|
||||
conn.execute("UPDATE tasks SET stage = 'architecture' WHERE plane_id = 'rej-001'")
|
||||
conn.execute(
|
||||
"INSERT INTO tasks (plane_id, work_item_id, repo, branch, stage, plane_issue_id) "
|
||||
"VALUES (?, ?, ?, ?, ?, ?)",
|
||||
("rej-001", "ET-002", "enduro-trails", "feature/ET-002-x", "architecture", "rej-001"),
|
||||
)
|
||||
conn.commit()
|
||||
conn.close()
|
||||
|
||||
# Send rejected comment
|
||||
# Send Rejected STATUS change.
|
||||
resp = client.post("/webhook/plane", json={
|
||||
"event": "comment.created",
|
||||
"event": "issue", "action": "updated",
|
||||
"data": {
|
||||
"work_item_id": "rej-001",
|
||||
"comment": "Not ready :rejected:"
|
||||
}
|
||||
"id": "rej-001", "name": "Reject test", "project": "proj-1",
|
||||
"state": {"id": REJECTED_STATE, "name": "Rejected", "group": "cancelled"},
|
||||
},
|
||||
})
|
||||
assert resp.status_code == 200
|
||||
|
||||
@@ -258,6 +273,46 @@ def test_gitea_ci_success_advances_to_review(mock_launcher, mock_ci):
|
||||
assert task["stage"] == "review"
|
||||
|
||||
|
||||
@patch("src.webhooks.gitea.notify_qg_failure")
|
||||
@patch("src.webhooks.gitea.launcher")
|
||||
def test_gitea_ci_failure_on_development_notifies_qg_failure(mock_launcher, mock_notify):
|
||||
"""BUG 6: CI failure at development is now the authoritative QG gate failing.
|
||||
|
||||
It must notify QG failure (not silently suppress) and must NOT advance the stage.
|
||||
"""
|
||||
conn = get_db()
|
||||
conn.execute(
|
||||
"INSERT INTO tasks (plane_id, work_item_id, repo, branch, stage) VALUES (?, ?, ?, ?, ?)",
|
||||
("ci-fail-001", "ET-011", "enduro-trails", "feature/ET-011-test", "development"),
|
||||
)
|
||||
conn.commit()
|
||||
conn.close()
|
||||
|
||||
resp = client.post(
|
||||
"/webhook/gitea",
|
||||
json={
|
||||
"state": "failure",
|
||||
"branches": [{"name": "feature/ET-011-test"}],
|
||||
"repository": {"name": "enduro-trails"},
|
||||
},
|
||||
headers={"X-Gitea-Event": "status"},
|
||||
)
|
||||
assert resp.status_code == 200
|
||||
|
||||
# QG failure was reported for the development stage with check_ci_green.
|
||||
assert mock_notify.called
|
||||
args, kwargs = mock_notify.call_args
|
||||
call = list(args) + list(kwargs.values())
|
||||
assert "development" in call
|
||||
assert "check_ci_green" in call
|
||||
|
||||
# Stage did NOT advance.
|
||||
conn = get_db()
|
||||
task = conn.execute("SELECT * FROM tasks WHERE plane_id = 'ci-fail-001'").fetchone()
|
||||
conn.close()
|
||||
assert task["stage"] == "development"
|
||||
|
||||
|
||||
def test_gitea_webhook_pr():
|
||||
"""PR event is accepted."""
|
||||
resp = client.post(
|
||||
@@ -287,3 +342,94 @@ def test_plane_webhook_event_logged():
|
||||
conn.close()
|
||||
assert event is not None
|
||||
assert event["source"] == "plane"
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# BUG 7: red CI on development must bounce the task back to the developer
|
||||
# (capped retries, symmetric to review REQUEST_CHANGES). These are pure-logic
|
||||
# tests: they invoke handle_ci_status() directly with mocked helpers so they do
|
||||
# not pass through the TestClient HMAC barrier (baseline 401s are off-limits).
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def _ci_failure_payload():
|
||||
return {
|
||||
"state": "failure",
|
||||
"branches": [{"name": "feature/ET-011-test"}],
|
||||
"repository": {"name": "enduro-trails"},
|
||||
}
|
||||
|
||||
|
||||
def _mock_db_with_retry_count(count):
|
||||
"""Build a get_db() mock whose retry_count query returns `count`."""
|
||||
conn = MagicMock()
|
||||
conn.execute.return_value.fetchone.return_value = {"cnt": count}
|
||||
return conn
|
||||
|
||||
|
||||
@patch("src.webhooks.gitea.notify_error")
|
||||
@patch("src.webhooks.gitea.notify_qg_failure")
|
||||
@patch("src.webhooks.gitea.enqueue_job")
|
||||
@patch("src.webhooks.gitea.update_task_stage")
|
||||
@patch("src.webhooks.gitea.get_db")
|
||||
@patch("src.webhooks.gitea.get_task_by_repo_branch")
|
||||
@patch("src.webhooks.gitea.get_project_by_repo")
|
||||
def test_ci_failure_development_retries_developer_under_limit(
|
||||
mock_proj, mock_task, mock_get_db, mock_update_stage,
|
||||
mock_enqueue, mock_qg, mock_err,
|
||||
):
|
||||
"""retry_count < MAX_DEV_RETRIES → relaunch developer, stage untouched."""
|
||||
from src.webhooks.gitea import handle_ci_status
|
||||
|
||||
mock_proj.return_value = {"repo": "enduro-trails"}
|
||||
mock_task.return_value = {
|
||||
"id": 1, "stage": "development", "work_item_id": "ET-011",
|
||||
}
|
||||
mock_get_db.return_value = _mock_db_with_retry_count(0)
|
||||
mock_enqueue.return_value = 42
|
||||
|
||||
asyncio.run(handle_ci_status(_ci_failure_payload()))
|
||||
|
||||
# QG failure was still reported (Slava sees both the failure and the retry).
|
||||
assert mock_qg.called
|
||||
# developer was re-enqueued.
|
||||
assert mock_enqueue.called
|
||||
assert mock_enqueue.call_args[0][0] == "developer"
|
||||
# No escalation.
|
||||
assert not mock_err.called
|
||||
# Stage stays on development — no update_task_stage in the CI-failure path.
|
||||
assert not mock_update_stage.called
|
||||
|
||||
|
||||
@patch("src.webhooks.gitea.notify_error")
|
||||
@patch("src.webhooks.gitea.notify_qg_failure")
|
||||
@patch("src.webhooks.gitea.enqueue_job")
|
||||
@patch("src.webhooks.gitea.update_task_stage")
|
||||
@patch("src.webhooks.gitea.get_db")
|
||||
@patch("src.webhooks.gitea.get_task_by_repo_branch")
|
||||
@patch("src.webhooks.gitea.get_project_by_repo")
|
||||
def test_ci_failure_development_escalates_at_limit(
|
||||
mock_proj, mock_task, mock_get_db, mock_update_stage,
|
||||
mock_enqueue, mock_qg, mock_err,
|
||||
):
|
||||
"""retry_count >= MAX_DEV_RETRIES → escalate via notify_error, no relaunch."""
|
||||
from src.webhooks.gitea import handle_ci_status, MAX_DEV_RETRIES
|
||||
|
||||
mock_proj.return_value = {"repo": "enduro-trails"}
|
||||
mock_task.return_value = {
|
||||
"id": 1, "stage": "development", "work_item_id": "ET-011",
|
||||
}
|
||||
mock_get_db.return_value = _mock_db_with_retry_count(MAX_DEV_RETRIES)
|
||||
|
||||
asyncio.run(handle_ci_status(_ci_failure_payload()))
|
||||
|
||||
# QG failure still reported.
|
||||
assert mock_qg.called
|
||||
# developer NOT re-enqueued at the cap.
|
||||
assert not mock_enqueue.called
|
||||
# Escalation message mentions CI failure.
|
||||
assert mock_err.called
|
||||
err_msg = " ".join(str(a) for a in mock_err.call_args[0])
|
||||
assert "Max developer retries" in err_msg
|
||||
assert "after CI failure" in err_msg
|
||||
# Stage untouched.
|
||||
assert not mock_update_stage.called
|
||||
|
||||
Reference in New Issue
Block a user