diff --git a/src/agents/launcher.py b/src/agents/launcher.py index 75b7cb8..dd2be7a 100644 --- a/src/agents/launcher.py +++ b/src/agents/launcher.py @@ -209,9 +209,15 @@ class AgentLauncher: # No git fetch/checkout here: ensure_worktree() already put the worktree on # the right branch. The agent simply runs inside its isolated work_path. + # Feature 4 (token usage): --output-format json makes claude emit a single + # result JSON (with usage + total_cost_usd) at the end of stdout. The log + # still captures it; _monitor_agent parses the trailing JSON after the run + # to record per-agent tokens/cost. _monitor_agent's failure handling keys + # off the process exit_code (not stdout shape), so this is safe. cmd = ( f'cd {work_path} && ' f'{self.CLAUDE_BIN} --print ' + f'--output-format json ' f'{model_flag}' f'"$(cat {task_file})" ' f'--system-prompt "$(cat {system_prompt})" ' @@ -400,6 +406,17 @@ class AgentLauncher: notify_agent_finished(run_id, agent, exit_code, task_id=_task_id, duration_s=_duration_s) + # Feature 4: parse token usage / cost from the (json) run log and record + # it on the agent_runs row. Never fatal — a garbled/missing JSON records + # NULLs and logs a warning so a broken run can't crash the monitor. + try: + from ..usage import parse_usage_from_log, record_usage + _usage = parse_usage_from_log(output_path) if output_path else None + record_usage(run_id, _usage) + except Exception as e: + logger.warning(f"run_id={run_id}: usage accounting failed: {e}") + _usage = None + # Commit and push any changes — in the per-branch worktree (ORCH-2 / S-4), # NOT in the shared /repos/. The worktree is already on `branch` # (ensure_worktree did the checkout), so no checkout is needed here. @@ -490,6 +507,14 @@ class AgentLauncher: from ..notifications import send_telegram send_telegram(f"\u26a0\ufe0f {_wid}: Agent {agent} failed (exit_code={exit_code}). Check logs: /app/data/runs/{run_id}.log") + # Feature 4: post the per-agent usage comment under that agent's bot, and + # — for the deployer finishing the task — the per-task usage summary. + if exit_code == 0: + try: + self._post_usage_comments(run_id, agent, repo, branch, _usage) + except Exception as e: + logger.warning(f"run_id={run_id}: usage comment failed: {e}") + # Auto-advance stage if agent finished successfully and QG passes if exit_code == 0: self._try_advance_stage(run_id, agent, repo, branch) @@ -654,6 +679,32 @@ class AgentLauncher: logger.error(f"Auto-advance failed for run_id={run_id}: {e}") + def _post_usage_comments(self, run_id, agent, repo, branch, usage): + """Feature 4: post the per-agent usage comment (and Deployer summary). + + - Always (on success, with a work_item_id): a per-agent finish comment + with token/cost, authored by the finishing agent's Plane bot. + - When the deployer finishes: also a per-task summary (SUM over + agent_runs GROUP BY agent), authored by the deployer. + """ + from ..usage import usage_comment, task_summary_comment + conn = get_db() + row = conn.execute( + "SELECT id, work_item_id FROM tasks WHERE repo=? AND branch=?", + (repo, branch), + ).fetchone() + conn.close() + if not row: + return + task_id, work_item_id = row[0], row[1] + if not work_item_id: + return + plane_add_comment(work_item_id, usage_comment(agent, usage), author=agent) + if agent == "deployer": + plane_add_comment( + work_item_id, task_summary_comment(task_id), author="deployer" + ) + def _ensure_pr(self, repo: str, branch: str, run_id: int): import httpx owner = settings.gitea_owner diff --git a/src/db.py b/src/db.py index 0b77610..f8bfa9d 100644 --- a/src/db.py +++ b/src/db.py @@ -77,6 +77,13 @@ def init_db(): "CREATE UNIQUE INDEX IF NOT EXISTS idx_events_delivery " "ON events(delivery_id) WHERE delivery_id IS NOT NULL" ) + # Feature 4 (token usage): per-run token / cost accounting. Parsed from the + # claude --output-format json result by the launcher monitor. Idempotent + # ALTERs (no-op once the columns exist) so this is safe on the live prod DB. + _ensure_column(conn, "agent_runs", "input_tokens", "INTEGER") + _ensure_column(conn, "agent_runs", "output_tokens", "INTEGER") + _ensure_column(conn, "agent_runs", "cache_read_tokens", "INTEGER") + _ensure_column(conn, "agent_runs", "cost_usd", "REAL") conn.commit() conn.close() diff --git a/src/plane_sync.py b/src/plane_sync.py index 1f9fd72..e96900a 100644 --- a/src/plane_sync.py +++ b/src/plane_sync.py @@ -84,7 +84,12 @@ def _resolve_project_id(work_item_id: str = None, project_id: str = None) -> str logger.debug(f"_resolve_project_id fallback for {work_item_id}: {e}") return PROJECT_ID -# Plane state IDs +# Plane state IDs. +# TODO(ORCH-10): these UUIDs are PER-PROJECT. The 6 stage-visibility / verdict +# statuses below were created only in the enduro project (7a79f0a9-...). One +# project is in prod today, so a single global dict is acceptable. When more +# projects are onboarded these must be resolved per project (see ORCH-10 in +# BACKLOG.md / the ORCH-6 project registry) — do NOT hardcode globally then. PLANE_STATES = { "backlog": "113b24f6-cce8-4be9-9a22-a359b9cf0122", "todo": "2c7d3df3-9eb9-419b-92b7-d7d560bcdd10", @@ -94,16 +99,39 @@ PLANE_STATES = { "blocked": "6c4543f9-ac47-4ef7-ae0f-070020dc9920", "done": "381a2833-3c4e-4be5-bd0f-be84cb946ad8", "cancelled": "b1cae7f9-961d-4889-a179-f3acea697d17", + # Feature 3 (stage visibility) — per-stage statuses on the board. + "architecture": "3020bbb7-6122-4663-930c-0315ba8dfa3d", + "development": "9920609b-f140-4e46-ab95-89acda8412c8", + "review": "ba0d802c-5218-41d4-ab43-978b0ea123ed", + "testing": "7855d807-b1bf-42ef-8dae-6cde0df92d02", + # Feature 2 (verdict statuses) — Approved / Rejected. + "approved": "a519a341-dada-4a91-8910-7604f82b79c5", + "rejected": "ba958f3c-5db5-461d-8f82-89425e413b97", } -# Map orchestrator stages to Plane states +# Feature 3: map an orchestrator stage -> the Plane status to show on the board +# when the pipeline ENTERS that stage. analysis stays driven by the existing +# in_progress/in_review/needs_input logic (no dedicated status). deploy keeps +# in_progress until done. Needs Input / In Review / Blocked remain higher +# priority and are set explicitly elsewhere — do NOT override them from here. +STAGE_VISIBILITY_STATE = { + "architecture": "architecture", + "development": "development", + "review": "review", + "testing": "testing", +} + +# Map orchestrator stages to Plane states (used by update_issue_state / +# notify_stage_change). Feature 3: architecture/development/review/testing now +# point at their dedicated board statuses so the task physically moves across +# columns. analysis -> in_progress, deploy -> in_progress, done -> done. STAGE_TO_STATE = { "created": PLANE_STATES["todo"], "analysis": PLANE_STATES["in_progress"], - "architecture": PLANE_STATES["in_progress"], - "development": PLANE_STATES["in_progress"], - "review": PLANE_STATES["in_progress"], - "testing": PLANE_STATES["in_progress"], + "architecture": PLANE_STATES["architecture"], + "development": PLANE_STATES["development"], + "review": PLANE_STATES["review"], + "testing": PLANE_STATES["testing"], "deploy": PLANE_STATES["in_progress"], "done": PLANE_STATES["done"], } @@ -242,6 +270,21 @@ def set_issue_in_progress(work_item_id: str, project_id: str = None): _set_issue_state_direct(work_item_id, PLANE_STATES["in_progress"], project_id) +def set_issue_stage_state(work_item_id: str, stage: str, project_id: str = None): + """Feature 3: move the issue to the board status for a pipeline stage. + + Only the visible-stage statuses (architecture/development/review/testing) + are driven here — stages without a dedicated status (analysis/deploy) are a + no-op so the existing in_progress/in_review/needs_input logic stays in + charge. By design this does NOT touch Needs Input / In Review / Blocked, + which are higher priority and set explicitly by their own helpers. + """ + state_key = STAGE_VISIBILITY_STATE.get(stage) + if not state_key: + return + _set_issue_state_direct(work_item_id, PLANE_STATES[state_key], project_id) + + def _set_issue_state_direct(work_item_id: str, state_id: str, project_id: str = None): """Set issue state directly by state_id.""" project_id = _resolve_project_id(work_item_id, project_id) diff --git a/src/usage.py b/src/usage.py new file mode 100644 index 0000000..8968381 --- /dev/null +++ b/src/usage.py @@ -0,0 +1,268 @@ +"""Feature 4: token / cost accounting for agent runs. + +claude --output-format json emits a single result JSON object at the end of the +run log with fields: + total_cost_usd + usage.input_tokens / output_tokens / cache_read_input_tokens / + cache_creation_input_tokens + modelUsage, num_turns, duration_ms + +This module parses that JSON out of a (text-or-json) run log, records the usage +on the agent_runs row, formats a Plane comment for the finishing agent, and +builds the per-task summary the Deployer posts on deploy/done. + +Everything here is defensive: a missing/garbled JSON never raises \u2014 we record +NULL/0 and log a warning so a broken agent run can't crash the monitor. +""" + +import json +import logging + +from .db import get_db + +logger = logging.getLogger("orchestrator.usage") + + +def parse_usage_from_text(text: str) -> dict | None: + """Extract the claude result-JSON usage from a run log's text. + + The log may contain plain text before/after the JSON; with + --output-format json the JSON is the final object. We scan for the LAST + top-level '{' ... '}' that parses and carries usage/total_cost_usd. + + Returns a normalised dict + {input_tokens, output_tokens, cache_read_tokens, cost_usd} + (ints / float, missing fields -> 0 / 0.0), or None if no usable JSON found. + """ + if not text: + return None + + candidate = _extract_last_json_object(text) + if candidate is None: + return None + + usage = candidate.get("usage") or {} + if not isinstance(usage, dict): + usage = {} + + cost = candidate.get("total_cost_usd") + if cost is None: + cost = candidate.get("cost_usd") + + # If there is neither a usage block nor a cost, this isn't a result object. + if not usage and cost is None: + return None + + def _int(v): + try: + return int(v) + except (TypeError, ValueError): + return 0 + + def _float(v): + try: + return float(v) + except (TypeError, ValueError): + return 0.0 + + return { + "input_tokens": _int(usage.get("input_tokens")), + "output_tokens": _int(usage.get("output_tokens")), + "cache_read_tokens": _int( + usage.get("cache_read_input_tokens", usage.get("cache_read_tokens")) + ), + "cost_usd": _float(cost), + } + + +def _extract_last_json_object(text: str) -> dict | None: + """Return the last balanced top-level JSON object in `text` that parses. + + Scans from the end for '}' and walks back to the matching '{' using a depth + counter (string-aware), trying json.loads on each candidate. Robust to log + lines or text emitted before the JSON. + """ + # Fast path: the whole stripped text is the JSON. + stripped = text.strip() + try: + obj = json.loads(stripped) + if isinstance(obj, dict): + return obj + except (ValueError, TypeError): + pass + + # Otherwise find the last balanced { ... } block. + end = len(text) + while True: + close = text.rfind("}", 0, end) + if close == -1: + return None + depth = 0 + in_str = False + esc = False + start = None + for i in range(close, -1, -1): + ch = text[i] + if in_str: + if esc: + esc = False + elif ch == "\\": + esc = True + elif ch == '"': + in_str = False + continue + if ch == '"': + in_str = True + elif ch == "}": + depth += 1 + elif ch == "{": + depth -= 1 + if depth == 0: + start = i + break + if start is not None: + blob = text[start:close + 1] + try: + obj = json.loads(blob) + if isinstance(obj, dict): + return obj + except (ValueError, TypeError): + pass + end = close # keep scanning earlier in the text + + +def parse_usage_from_log(path: str) -> dict | None: + """Read a run log file and parse usage from it. Never raises.""" + try: + with open(path, "r", encoding="utf-8", errors="replace") as f: + return parse_usage_from_text(f.read()) + except OSError as e: + logger.warning(f"parse_usage_from_log: cannot read {path}: {e}") + return None + + +def record_usage(run_id: int, usage: dict | None): + """Write parsed usage onto the agent_runs row. NULLs if usage is None.""" + if usage is None: + logger.warning(f"run_id={run_id}: no usage JSON parsed, recording NULLs") + usage = {} + conn = get_db() + try: + conn.execute( + "UPDATE agent_runs SET input_tokens=?, output_tokens=?, " + "cache_read_tokens=?, cost_usd=? WHERE id=?", + ( + usage.get("input_tokens"), + usage.get("output_tokens"), + usage.get("cache_read_tokens"), + usage.get("cost_usd"), + run_id, + ), + ) + conn.commit() + finally: + conn.close() + + +def fmt_tokens(n) -> str: + """Format a token count compactly: 1234 -> '1.2k', 2_500_000 -> '2.5M'.""" + try: + n = int(n or 0) + except (TypeError, ValueError): + n = 0 + if n >= 1_000_000: + return f"{n / 1_000_000:.1f}M" + if n >= 1_000: + return f"{n / 1_000:.1f}k" + return str(n) + + +def fmt_cost(c) -> str: + """Format USD cost with 2 decimals: '$0.21'.""" + try: + c = float(c or 0.0) + except (TypeError, ValueError): + c = 0.0 + return f"${c:.2f}" + + +# Pretty agent names for comments (mirrors STAGE_AUTHORS roles). +AGENT_DISPLAY = { + "analyst": "Analyst", + "architect": "Architect", + "developer": "Developer", + "reviewer": "Reviewer", + "tester": "Tester", + "deployer": "Deployer", +} + + +def usage_comment(agent: str, usage: dict | None) -> str: + """Build the per-agent finish comment, e.g. + '\U0001f4bb Developer \u0433\u043e\u0442\u043e\u0432 \u00b7 45.2k in / 12.1k out \u00b7 $0.21'. + """ + usage = usage or {} + name = AGENT_DISPLAY.get(agent, agent.capitalize()) + icon = AGENT_ICON.get(agent, "\u2705") + return ( + f"{icon} {name} \u0433\u043e\u0442\u043e\u0432 \u00b7 " + f"{fmt_tokens(usage.get('input_tokens'))} in / " + f"{fmt_tokens(usage.get('output_tokens'))} out \u00b7 " + f"{fmt_cost(usage.get('cost_usd'))}" + ) + + +AGENT_ICON = { + "analyst": "\U0001f50d", + "architect": "\U0001f4d0", + "developer": "\U0001f4bb", + "reviewer": "\U0001f50e", + "tester": "\U0001f9ea", + "deployer": "\U0001f680", +} + + +def task_usage_summary(task_id: int) -> dict: + """Aggregate agent_runs usage for a task. + + Returns {total_in, total_out, total_cost, per_agent: [(agent, in, out, cost), ...]}. + """ + conn = get_db() + try: + rows = conn.execute( + "SELECT agent, " + "COALESCE(SUM(input_tokens),0), " + "COALESCE(SUM(output_tokens),0), " + "COALESCE(SUM(cost_usd),0.0) " + "FROM agent_runs WHERE task_id=? GROUP BY agent ORDER BY agent", + (task_id,), + ).fetchall() + finally: + conn.close() + per_agent = [(r[0], int(r[1]), int(r[2]), float(r[3])) for r in rows] + total_in = sum(r[1] for r in per_agent) + total_out = sum(r[2] for r in per_agent) + total_cost = sum(r[3] for r in per_agent) + return { + "total_in": total_in, + "total_out": total_out, + "total_cost": total_cost, + "per_agent": per_agent, + } + + +def task_summary_comment(task_id: int) -> str: + """Build the Deployer end-of-task summary comment (Feature 4, variant B).""" + s = task_usage_summary(task_id) + lines = [ + f"\U0001f4ca \u0418\u0442\u043e\u0433\u043e \u043f\u043e \u0437\u0430\u0434\u0430\u0447\u0435: " + f"{fmt_tokens(s['total_in'])} \u0442\u043e\u043a\u0435\u043d\u043e\u0432 \u0432\u0445\u043e\u0434 / " + f"{fmt_tokens(s['total_out'])} \u0432\u044b\u0445\u043e\u0434 \u00b7 " + f"{fmt_cost(s['total_cost'])}" + ] + for agent, ti, to, cost in s["per_agent"]: + name = AGENT_DISPLAY.get(agent, agent.capitalize()) + lines.append( + f"\u2022 {name}: {fmt_tokens(ti)} in / {fmt_tokens(to)} out \u00b7 {fmt_cost(cost)}" + ) + return "\n".join(lines) diff --git a/src/webhooks/plane.py b/src/webhooks/plane.py index 2b854d1..1bd3c94 100644 --- a/src/webhooks/plane.py +++ b/src/webhooks/plane.py @@ -92,38 +92,139 @@ async def plane_webhook(request: Request): return {"status": "ignored", "reason": "unknown project"} if (event == "work_item.created") or (event == "issue" and action == "created"): + # Feature 1: creation NO LONGER starts the pipeline. Slava keeps the + # backlog until he moves an issue to In Progress. We only run a soft + # QG-0 sanity log here (no branch, no analyst, no task row). await handle_work_item_created(data, project_id) + elif (event == "work_item.updated") or (event == "issue" and action == "updated"): + # Feature 1 & 2: status changes drive the pipeline. + # Backlog/Todo/Triage -> In Progress : START the pipeline (idempotent) + # -> Approved : advance (== :approved: comment) + # -> Rejected : rollback (== :rejected: comment) + await handle_issue_updated(data, project_id) elif (event == "comment.created") or (event == "issue_comment" and action == "created"): await handle_comment(data, project_id) return {"status": "accepted"} -async def handle_work_item_created(data: dict, project_id: str = ""): +def _state_id(data: dict) -> str: + """Extract the new Plane state UUID from an 'issue updated' payload. + + Real payload (verified from prod events): data.state is + {id, name, color, group}. Some payloads carry state as a bare UUID string. """ - New work item created in Plane. - QG-0: validate title, description, priority. - If valid: create branch, init docs, launch analyst. - If invalid: comment with what's missing, set Blocked. + state = data.get("state") + if isinstance(state, dict): + return state.get("id", "") or "" + if isinstance(state, str): + return state + return "" + + +async def handle_issue_updated(data: dict, project_id: str = ""): + """Feature 1 & 2: react to a Plane issue status change. + + Routes the NEW state UUID (data.state.id) to: + - in_progress : start the pipeline if this issue has no task yet + (idempotent — an existing task is NOT restarted; protects handle_comment + which also flips issues to In Progress during approve/answer flows). + - approved : same as a :approved: comment (advance current stage). + - rejected : same as a :rejected: comment (rollback + relaunch). + Any other status (Needs Input, In Review, Blocked, Done, board stages, etc.) + is ignored here — those are statuses the orchestrator itself sets. + """ + from ..plane_sync import PLANE_STATES + + plane_id = str(data.get("id") or "") + new_state = _state_id(data) + if not plane_id or not new_state: + logger.info("issue updated without id/state, ignoring") + return + + if new_state == PLANE_STATES["in_progress"]: + await handle_status_start(data, project_id) + elif new_state == PLANE_STATES["approved"]: + await handle_verdict(data, project_id, approved=True) + elif new_state == PLANE_STATES["rejected"]: + await handle_verdict(data, project_id, approved=False) + else: + logger.info(f"issue {plane_id} updated to state {new_state[:8]}..., no pipeline action") + + +async def handle_status_start(data: dict, project_id: str = ""): + """Feature 1: an issue moved into In Progress -> start the pipeline. + + Idempotent: if a task already exists for this plane_id, do nothing (no dup, + no analyst restart). This is what makes handle_comment's set_issue_in_progress + safe — by then the task already exists, so the start is skipped. + """ + plane_id = str(data.get("id") or "") + existing = get_task_by_plane_id(plane_id) + if existing: + logger.info( + f"Status->In Progress for {plane_id}: task already exists " + f"(stage={existing.get('stage')}), not restarting" + ) + return + logger.info(f"Status->In Progress for {plane_id}: starting pipeline") + await start_pipeline(data, project_id) + + +async def handle_verdict(data: dict, project_id: str, approved: bool): + """Feature 2 (variant B): a status verdict mirrors the comment verdicts. + + Approved status == :approved: comment -> _try_advance_stage. + Rejected status == :rejected: comment -> rollback to previous stage + relaunch + (reason is unknown from a status change; Slava writes it in a separate + comment, so we pass a fixed note). + """ + plane_id = str(data.get("id") or "") + task = get_task_by_plane_id(plane_id) + if not task: + logger.warning(f"Verdict status for {plane_id} but no task found, ignoring") + return + + task_id = task["id"] + current_stage = task["stage"] + repo = task["repo"] + work_item_id = task.get("work_item_id", "") + branch = task.get("branch", "") + + if approved: + from ..plane_sync import set_issue_in_progress + set_issue_in_progress(work_item_id) + logger.info(f"Task {task_id}: Approved status -> advance from {current_stage}") + await _try_advance_stage(task_id, current_stage, repo, work_item_id, branch) + return + + # Rejected: mirror the :rejected: comment rollback branch. + reason = "(rejected via status, see latest comment)" + await _rollback_stage( + task_id, current_stage, repo, work_item_id, branch, reason + ) + + +async def handle_work_item_created(data: dict, project_id: str = ""): + """Feature 1: creation does NOT start the pipeline anymore. + + The pipeline is started when Slava moves the issue into In Progress + (handle_status_start -> start_pipeline). On creation we only run a SOFT QG-0 + sanity check and log the result — NO branch, NO docs, NO analyst, NO task row + — so the issue can sit in the backlog until Slava is ready. """ plane_id = data.get("id", "") name = data.get("name", "untitled") description = data.get("description_stripped", data.get("description", "")) - priority = data.get("priority", {}) - priority_name = priority if isinstance(priority, str) else priority.get("name", "") + errors = _qg0_errors(name, description) + if errors: + logger.info(f"work_item.created {plane_id}: soft QG-0 warnings: {errors}") + else: + logger.info(f"work_item.created {plane_id} ('{name}'): in backlog, awaiting In Progress") - # ORCH-6: resolve repo / prefix / Plane project from the registry instead of - # the single hardcoded default_repo. - if not project_id: - project_id = data.get("project") or data.get("project_id") or "" - proj = get_project_by_plane_id(project_id) - if not proj: - logger.warning(f"handle_work_item_created: unknown project '{project_id}', ignoring {plane_id}") - return - repo = proj.repo - plane_project_id = proj.plane_project_id - # QG-0 validation +def _qg0_errors(name: str, description: str) -> list: + """QG-0 validation: returns a list of human-readable problems (empty = OK).""" errors = [] if not name or len(name) < 5: errors.append("Title \u0441\u043b\u0438\u0448\u043a\u043e\u043c \u043a\u043e\u0440\u043e\u0442\u043a\u0438\u0439 (\u043d\u0443\u0436\u043d\u043e >= 5 \u0441\u0438\u043c\u0432\u043e\u043b\u043e\u0432)") @@ -132,6 +233,36 @@ async def handle_work_item_created(data: dict, project_id: str = ""): if not description or len(description.strip()) < 20: errors.append("Description \u0441\u043b\u0438\u0448\u043a\u043e\u043c \u043a\u043e\u0440\u043e\u0442\u043a\u0438\u0439 (\u043d\u0443\u0436\u043d\u043e >= 20 \u0441\u0438\u043c\u0432\u043e\u043b\u043e\u0432)") + return errors + + +async def start_pipeline(data: dict, project_id: str = ""): + """Feature 1: start the pipeline for an issue (moved to In Progress). + + This is the body extracted from the old handle_work_item_created: resolve the + project, run QG-0 (hard — blocks on failure), create the work item id + + branch + initial docs, insert the task row, and enqueue the analyst. + + Callers (handle_status_start) already guarantee no existing task for this + plane_id, so this never duplicates. + """ + plane_id = data.get("id", "") + name = data.get("name", "untitled") + description = data.get("description_stripped", data.get("description", "")) + + # ORCH-6: resolve repo / prefix / Plane project from the registry instead of + # the single hardcoded default_repo. + if not project_id: + project_id = data.get("project") or data.get("project_id") or "" + proj = get_project_by_plane_id(project_id) + if not proj: + logger.warning(f"start_pipeline: unknown project '{project_id}', ignoring {plane_id}") + return + repo = proj.repo + plane_project_id = proj.plane_project_id + + # QG-0 validation (hard gate on pipeline start) + errors = _qg0_errors(name, description) if errors: # QG-0 failed error_text = "\u26a0\ufe0f QG-0 failed:\n" + "\n".join(f"\u2022 {e}" for e in errors) @@ -240,36 +371,7 @@ async def handle_comment(data: dict, project_id: str = ""): if ":rejected:" in comment_body: # Extract reason (text after :rejected:) reason = comment_body.split(":rejected:", 1)[-1].strip()[:300] - - if current_stage == "analysis": - # Already in analysis — just relaunch analyst with rejection reason - from ..plane_sync import set_issue_in_progress - set_issue_in_progress(work_item_id) - task_desc = ( - f"Work item: {work_item_id}\nRepo: {repo}\nBranch: {branch}\n" - f"Stage: analysis\nNote: Stakeholder REJECTED your artifacts. " - f"Reason: {reason}\nRevise and improve." - ) - new_job = enqueue_job("analyst", repo, task_desc, task_id=task_id) - from ..plane_sync import add_comment as _plane_comment - _plane_comment(work_item_id, f"\U0001f504 Analyst \u043f\u0435\u0440\u0435\u0437\u0430\u043f\u0443\u0449\u0435\u043d. \u041f\u0440\u0438\u0447\u0438\u043d\u0430 \u043e\u0442\u043a\u043b\u043e\u043d\u0435\u043d\u0438\u044f: {reason}", author="analyst") - logger.info(f"Task {task_id}: rejected at analysis, enqueued analyst (job_id={new_job})") - else: - # Rollback to previous stage - prev_stage = get_previous_stage(current_stage) - if prev_stage: - update_task_stage(task_id, prev_stage) - from ..plane_sync import set_issue_in_progress - set_issue_in_progress(work_item_id) - notify_stage_change(task_id, current_stage, prev_stage) - plane_notify_stage(work_item_id, current_stage, prev_stage) - from ..plane_sync import add_comment as _plane_comment, STAGE_AUTHORS - _plane_comment( - work_item_id, - f"\U0001f504 \u041e\u0442\u043a\u0430\u0442: {current_stage} \u2192 {prev_stage}. \u041f\u0440\u0438\u0447\u0438\u043d\u0430: {reason}", - author=STAGE_AUTHORS.get(prev_stage, "stream"), - ) - logger.info(f"Task {task_id}: rejected, rolled back {current_stage} \u2192 {prev_stage}") + await _rollback_stage(task_id, current_stage, repo, work_item_id, branch, reason) return if ":approved:" in comment_body: @@ -338,6 +440,72 @@ async def handle_comment(data: dict, project_id: str = ""): logger.error(f"Failed to check issue state: {e}") +async def _rollback_stage( + task_id: int, current_stage: str, repo: str, work_item_id: str, branch: str, + reason: str, +): + """Shared :rejected: / Rejected-status rollback (Feature 2 variant B). + + Both the :rejected: comment and a status change to Rejected funnel here so + the two mechanisms behave identically: + - at analysis: relaunch the analyst with the rejection reason; + - otherwise: roll back to the previous stage and relaunch its agent + (via the existing rollback notify + an enqueue of the prev-stage agent). + """ + if current_stage == "analysis": + # Already in analysis — just relaunch analyst with rejection reason + from ..plane_sync import set_issue_in_progress + set_issue_in_progress(work_item_id) + task_desc = ( + f"Work item: {work_item_id}\nRepo: {repo}\nBranch: {branch}\n" + f"Stage: analysis\nNote: Stakeholder REJECTED your artifacts. " + f"Reason: {reason}\nRevise and improve." + ) + new_job = enqueue_job("analyst", repo, task_desc, task_id=task_id) + from ..plane_sync import add_comment as _plane_comment + _plane_comment(work_item_id, f"\U0001f504 Analyst \u043f\u0435\u0440\u0435\u0437\u0430\u043f\u0443\u0449\u0435\u043d. \u041f\u0440\u0438\u0447\u0438\u043d\u0430 \u043e\u0442\u043a\u043b\u043e\u043d\u0435\u043d\u0438\u044f: {reason}", author="analyst") + logger.info(f"Task {task_id}: rejected at analysis, enqueued analyst (job_id={new_job})") + return + + # Rollback to previous stage + prev_stage = get_previous_stage(current_stage) + if not prev_stage: + logger.info(f"Task {task_id}: rejected at {current_stage} but no previous stage") + return + update_task_stage(task_id, prev_stage) + notify_stage_change(task_id, current_stage, prev_stage) + # Feature 3: plane_notify_stage moves the board to the prev stage's status. + plane_notify_stage(work_item_id, current_stage, prev_stage) + # Then put it back to In Progress so the relaunched agent is clearly working. + from ..plane_sync import set_issue_in_progress + set_issue_in_progress(work_item_id) + from ..plane_sync import add_comment as _plane_comment, STAGE_AUTHORS + _plane_comment( + work_item_id, + f"\U0001f504 \u041e\u0442\u043a\u0430\u0442: {current_stage} \u2192 {prev_stage}. \u041f\u0440\u0438\u0447\u0438\u043d\u0430: {reason}", + author=STAGE_AUTHORS.get(prev_stage, "stream"), + ) + # Relaunch the previous stage's agent so the rollback actually re-runs work. + # STAGE_AUTHORS maps a stage directly to the role that OWNS work in it + # (analysis->analyst, architecture->architect, ...), which is exactly the + # agent we must re-run on a rollback into prev_stage. + from ..plane_sync import STAGE_AUTHORS as _STAGE_AUTHORS + prev_agent = _STAGE_AUTHORS.get(prev_stage) + if prev_agent: + task_desc = ( + f"Work item: {work_item_id}\nRepo: {repo}\nBranch: {branch}\n" + f"Stage: {prev_stage}\nNote: Stakeholder REJECTED. Reason: {reason}\n" + f"Revise and improve." + ) + new_job = enqueue_job(prev_agent, repo, task_desc, task_id=task_id) + logger.info( + f"Task {task_id}: rejected, rolled back {current_stage} \u2192 {prev_stage}, " + f"enqueued {prev_agent} (job_id={new_job})" + ) + else: + logger.info(f"Task {task_id}: rejected, rolled back {current_stage} \u2192 {prev_stage}") + + async def _try_advance_stage( task_id: int, current_stage: str, repo: str, work_item_id: str, branch: str ): diff --git a/tests/conftest.py b/tests/conftest.py new file mode 100644 index 0000000..991f025 --- /dev/null +++ b/tests/conftest.py @@ -0,0 +1,40 @@ +"""Global pytest fixtures. + +test(conftest): mute Telegram in ALL tests to stop prod leakage. + +Background: a pytest run on prod was sending REAL Telegram messages to Slava, +because some tests (e.g. test_webhook_dedup advancing a stage) reach +notify_stage_change -> send_telegram, which reads the live .env +telegram_bot_token/chat_id and actually POSTs to Telegram. + +This autouse fixture stubs send_telegram to a no-op for every test: + + - "src.notifications.send_telegram" is the SOURCE. All the notify_* helpers in + notifications.py call the module-global send_telegram, and every other module + that does a *local* `from .notifications import send_telegram` inside a + function resolves it live at call time -> covered by patching the source. + + - "src.stage_engine.send_telegram" is patched too, because stage_engine binds + send_telegram as a MODULE-LEVEL name (from .notifications import send_telegram + at import), so a patch of the source alone would not intercept its 3 direct + calls. webhooks/plane and launcher import it locally inside functions, so the + source patch already covers them; they are patched defensively with + raising=False anyway in case that ever changes. + +raising=False so a module that doesn't (yet) expose the name never breaks setup. +""" + +import pytest + + +@pytest.fixture(autouse=True) +def _no_telegram(monkeypatch): + _noop = lambda *a, **k: None # noqa: E731 + # Source of truth (covers notifications.notify_* and all local re-imports). + monkeypatch.setattr("src.notifications.send_telegram", _noop, raising=False) + # Module-level binding in stage_engine (and defensive coverage elsewhere). + monkeypatch.setattr("src.stage_engine.send_telegram", _noop, raising=False) + monkeypatch.setattr("src.webhooks.plane.send_telegram", _noop, raising=False) + monkeypatch.setattr("src.agents.launcher.send_telegram", _noop, raising=False) + monkeypatch.setattr("src.queue_worker.send_telegram", _noop, raising=False) + yield diff --git a/tests/test_m6_sequence.py b/tests/test_m6_sequence.py index 83fc951..81720f0 100644 --- a/tests/test_m6_sequence.py +++ b/tests/test_m6_sequence.py @@ -102,16 +102,22 @@ def test_fetch_sequence_id_missing_field_returns_none(): # handle_work_item_created: seq available -> prefix-NNN # --------------------------------------------------------------------------- +# Feature 1: pipeline starts on a status change to In Progress, not on creation. +_IN_PROGRESS = "b873d9eb-993c-48cd-97ac-99a9b1623967" + + def _post(plane_id, plane_project_id=ORCH_PLANE_ID, name="A valid work item title"): return client.post( "/webhook/plane", json={ - "event": "work_item.created", + "event": "issue", + "action": "updated", "data": { "id": plane_id, "name": name, "description_stripped": "This is a sufficiently long description.", "project": plane_project_id, + "state": {"id": _IN_PROGRESS, "name": "In Progress", "group": "started"}, }, }, ) diff --git a/tests/test_plane_webhook.py b/tests/test_plane_webhook.py index c213376..baf7887 100644 --- a/tests/test_plane_webhook.py +++ b/tests/test_plane_webhook.py @@ -73,16 +73,24 @@ def setup(monkeypatch): os.unlink(_test_db) +# Feature 1: the pipeline now starts on a status change to In Progress (not on +# creation). _post_created drives that status-change event so these ORCH-6 +# routing tests still exercise task creation through the new trigger. +_IN_PROGRESS = "b873d9eb-993c-48cd-97ac-99a9b1623967" + + def _post_created(plane_project_id, plane_id="wi-1", name="A valid work item title"): return client.post( "/webhook/plane", json={ - "event": "work_item.created", + "event": "issue", + "action": "updated", "data": { "id": plane_id, "name": name, "description_stripped": "This is a sufficiently long description.", "project": plane_project_id, + "state": {"id": _IN_PROGRESS, "name": "In Progress", "group": "started"}, }, }, ) diff --git a/tests/test_stage_visibility.py b/tests/test_stage_visibility.py new file mode 100644 index 0000000..a41f5c7 --- /dev/null +++ b/tests/test_stage_visibility.py @@ -0,0 +1,94 @@ +"""Feature 3: stage visibility on the Plane board. + + * PLANE_STATES carries the 6 new per-stage / verdict UUIDs. + * STAGE_TO_STATE maps architecture/development/review/testing to their + dedicated board statuses (not all -> In Progress anymore). + * set_issue_stage_state(work_item_id, stage) PATCHes the correct state UUID + for a visible stage, and is a no-op for stages without one (analysis/deploy). + * Needs Input / In Review / Blocked remain higher priority: their explicit + setters use their own state, never overwritten by the stage map. + +httpx is mocked; no network. +""" + +import os + +os.environ.setdefault("ORCH_PLANE_API_TOKEN", "test-token") +os.environ.setdefault("ORCH_GITEA_TOKEN", "test-token") + +from unittest.mock import patch, MagicMock # noqa: E402 + +from src import plane_sync as PS # noqa: E402 + + +EXPECTED_UUIDS = { + "architecture": "3020bbb7-6122-4663-930c-0315ba8dfa3d", + "development": "9920609b-f140-4e46-ab95-89acda8412c8", + "review": "ba0d802c-5218-41d4-ab43-978b0ea123ed", + "testing": "7855d807-b1bf-42ef-8dae-6cde0df92d02", + "approved": "a519a341-dada-4a91-8910-7604f82b79c5", + "rejected": "ba958f3c-5db5-461d-8f82-89425e413b97", +} + + +def test_plane_states_has_new_uuids(): + for key, uuid in EXPECTED_UUIDS.items(): + assert PS.PLANE_STATES[key] == uuid + + +def test_stage_to_state_maps_visible_stages(): + assert PS.STAGE_TO_STATE["architecture"] == EXPECTED_UUIDS["architecture"] + assert PS.STAGE_TO_STATE["development"] == EXPECTED_UUIDS["development"] + assert PS.STAGE_TO_STATE["review"] == EXPECTED_UUIDS["review"] + assert PS.STAGE_TO_STATE["testing"] == EXPECTED_UUIDS["testing"] + # analysis / deploy stay on In Progress; done stays Done. + assert PS.STAGE_TO_STATE["analysis"] == PS.PLANE_STATES["in_progress"] + assert PS.STAGE_TO_STATE["deploy"] == PS.PLANE_STATES["in_progress"] + assert PS.STAGE_TO_STATE["done"] == PS.PLANE_STATES["done"] + + +def _patch_resolution(monkey_targets): + """Helper: patch find_issue_id + _resolve_project_id to skip the DB/network.""" + return monkey_targets + + +@patch("src.plane_sync.httpx.patch") +@patch("src.plane_sync.find_issue_id", return_value="issue-uuid") +@patch("src.plane_sync._resolve_project_id", return_value="proj-1") +def test_set_issue_stage_state_patches_correct_uuid(mock_proj, mock_find, mock_patch): + resp = MagicMock(); resp.raise_for_status.return_value = None + mock_patch.return_value = resp + + PS.set_issue_stage_state("ET-1", "development") + # the PATCH carried the development state UUID + _, kwargs = mock_patch.call_args + assert kwargs["json"]["state"] == EXPECTED_UUIDS["development"] + + +@patch("src.plane_sync.httpx.patch") +@patch("src.plane_sync.find_issue_id", return_value="issue-uuid") +@patch("src.plane_sync._resolve_project_id", return_value="proj-1") +def test_set_issue_stage_state_noop_for_analysis(mock_proj, mock_find, mock_patch): + # analysis has no dedicated board status -> no PATCH at all. + PS.set_issue_stage_state("ET-1", "analysis") + mock_patch.assert_not_called() + PS.set_issue_stage_state("ET-1", "deploy") + mock_patch.assert_not_called() + + +@patch("src.plane_sync.httpx.patch") +@patch("src.plane_sync.find_issue_id", return_value="issue-uuid") +@patch("src.plane_sync._resolve_project_id", return_value="proj-1") +def test_priority_states_use_their_own_uuid(mock_proj, mock_find, mock_patch): + """Needs Input / In Review / Blocked are set explicitly and take priority.""" + resp = MagicMock(); resp.raise_for_status.return_value = None + mock_patch.return_value = resp + + PS.set_issue_needs_input("ET-1") + assert mock_patch.call_args.kwargs["json"]["state"] == PS.PLANE_STATES["needs_input"] + + PS.set_issue_in_review("ET-1") + assert mock_patch.call_args.kwargs["json"]["state"] == PS.PLANE_STATES["in_review"] + + PS.set_issue_blocked("ET-1") + assert mock_patch.call_args.kwargs["json"]["state"] == PS.PLANE_STATES["blocked"] diff --git a/tests/test_status_trigger.py b/tests/test_status_trigger.py new file mode 100644 index 0000000..b4e6ef3 --- /dev/null +++ b/tests/test_status_trigger.py @@ -0,0 +1,150 @@ +"""Feature 1: pipeline starts on status -> In Progress, not on creation. + + * work_item.created / issue created -> NO task, NO branch, NO analyst. + * issue updated -> In Progress (from backlog) -> task created + analyst enqueued. + * a second In Progress update for the same issue -> NO duplicate, NO restart + (protects handle_comment, which also flips issues to In Progress). + +launcher / Gitea network are mocked. Real FastAPI endpoint via TestClient. +""" + +import os +import tempfile + +_test_db = os.path.join(tempfile.gettempdir(), "test_orchestrator_status_trigger.db") +os.environ["ORCH_DB_PATH"] = _test_db +os.environ.setdefault("ORCH_PLANE_WEBHOOK_SECRET", "") +os.environ.setdefault("ORCH_GITEA_TOKEN", "test-token") +os.environ.setdefault("ORCH_PLANE_API_TOKEN", "test-token") + +import pytest # noqa: E402 +from unittest.mock import patch, AsyncMock # noqa: E402 +from fastapi.testclient import TestClient # noqa: E402 + +from src.main import app # noqa: E402 +from src.db import init_db, get_db # noqa: E402 +from src import projects as P # noqa: E402 +from src.projects import reload_projects # noqa: E402 + +ENDURO_PLANE_ID = "7a79f0a9-5278-49cd-9007-9a338f238f9c" +IN_PROGRESS = "b873d9eb-993c-48cd-97ac-99a9b1623967" +BACKLOG = "113b24f6-cce8-4be9-9a22-a359b9cf0122" + +client = TestClient(app) + + +@pytest.fixture(autouse=True) +def setup(monkeypatch): + monkeypatch.setattr(P.settings, "db_path", _test_db) + import src.db as _db + monkeypatch.setattr(_db.settings, "db_path", _test_db) + if os.path.exists(_test_db): + os.unlink(_test_db) + init_db() + monkeypatch.setattr("src.webhooks.plane.verify_plane_signature", lambda body, sig: True) + registry_json = ( + f'[{{"plane_project_id": "{ENDURO_PLANE_ID}", "repo": "enduro-trails",' + f' "work_item_prefix": "ET", "name": "enduro-trails"}}]' + ) + monkeypatch.setattr(P.settings, "projects_json", registry_json) + reload_projects() + yield + reload_projects() + if os.path.exists(_test_db): + os.unlink(_test_db) + + +def _created(plane_id="st-created"): + return client.post("/webhook/plane", json={ + "event": "issue", "action": "created", + "data": { + "id": plane_id, "name": "A valid backlog item title", + "description_stripped": "A sufficiently long description for QG-0.", + "project": ENDURO_PLANE_ID, + "state": {"id": BACKLOG, "name": "Backlog", "group": "backlog"}, + }, + }) + + +def _to_in_progress(plane_id="st-1"): + return client.post("/webhook/plane", json={ + "event": "issue", "action": "updated", + "data": { + "id": plane_id, "name": "A valid backlog item title", + "description_stripped": "A sufficiently long description for QG-0.", + "project": ENDURO_PLANE_ID, + "state": {"id": IN_PROGRESS, "name": "In Progress", "group": "started"}, + }, + "activity": {"field": "state", "new_value": IN_PROGRESS, "old_value": BACKLOG}, + }) + + +def _count(plane_id): + conn = get_db() + n = conn.execute("SELECT COUNT(*) FROM tasks WHERE plane_id=?", (plane_id,)).fetchone()[0] + conn.close() + return n + + +# --------------------------------------------------------------------------- # +@patch("src.webhooks.plane.enqueue_job") +@patch("src.webhooks.plane._create_initial_docs", new_callable=AsyncMock) +@patch("src.webhooks.plane._create_gitea_branch", new_callable=AsyncMock) +def test_created_does_not_start_pipeline(mock_branch, mock_docs, mock_enqueue): + resp = _created("st-created") + assert resp.status_code == 200 + assert resp.json()["status"] == "accepted" + # No task, no branch, no analyst enqueue. + assert _count("st-created") == 0 + mock_branch.assert_not_called() + mock_enqueue.assert_not_called() + + +@patch("src.webhooks.plane.enqueue_job") +@patch("src.webhooks.plane._create_initial_docs", new_callable=AsyncMock) +@patch("src.webhooks.plane._create_gitea_branch", new_callable=AsyncMock) +@patch("src.plane_sync.fetch_issue_sequence_id", return_value=5) +def test_in_progress_starts_pipeline(mock_seq, mock_branch, mock_docs, mock_enqueue): + mock_enqueue.return_value = 1 + resp = _to_in_progress("st-1") + assert resp.status_code == 200 + assert resp.json()["status"] == "accepted" + assert _count("st-1") == 1 + conn = get_db() + task = conn.execute("SELECT * FROM tasks WHERE plane_id='st-1'").fetchone() + conn.close() + assert task["stage"] == "analysis" + assert task["repo"] == "enduro-trails" + mock_branch.assert_called_once() + # analyst enqueued exactly once + assert mock_enqueue.call_count == 1 + assert mock_enqueue.call_args.args[0] == "analyst" + + +@patch("src.webhooks.plane.enqueue_job") +@patch("src.webhooks.plane._create_initial_docs", new_callable=AsyncMock) +@patch("src.webhooks.plane._create_gitea_branch", new_callable=AsyncMock) +@patch("src.plane_sync.fetch_issue_sequence_id", return_value=5) +def test_repeat_in_progress_is_idempotent(mock_seq, mock_branch, mock_docs, mock_enqueue): + mock_enqueue.return_value = 1 + _to_in_progress("st-2") + assert _count("st-2") == 1 + assert mock_enqueue.call_count == 1 + + # Second In Progress update (e.g. handle_comment re-set the status). Use a + # DISTINCT body (different activity old_value) so webhook dedup does NOT + # short-circuit it — this exercises the existing-task idempotency guard in + # handle_status_start, not the delivery-dedup layer. + resp = client.post("/webhook/plane", json={ + "event": "issue", "action": "updated", + "data": { + "id": "st-2", "name": "A valid backlog item title", + "description_stripped": "A sufficiently long description for QG-0.", + "project": ENDURO_PLANE_ID, + "state": {"id": IN_PROGRESS, "name": "In Progress", "group": "started"}, + }, + "activity": {"field": "state", "new_value": IN_PROGRESS, "old_value": "some-other-state"}, + }) + assert resp.status_code == 200 + assert _count("st-2") == 1 # still exactly one task + assert mock_enqueue.call_count == 1 # analyst NOT re-enqueued diff --git a/tests/test_usage.py b/tests/test_usage.py new file mode 100644 index 0000000..59d059b --- /dev/null +++ b/tests/test_usage.py @@ -0,0 +1,176 @@ +"""Feature 4: token / cost accounting tests. + +Covers: + * parse_usage_from_text on a REAL claude --output-format json result blob + (captured live from CLI 2.1.142), including a leading text line. + * parse on garbage / missing JSON -> None (never raises). + * record_usage writes the columns; NULLs when usage is None. + * fmt_tokens / fmt_cost formatting. + * usage_comment string format. + * task_usage_summary / task_summary_comment aggregate over agent_runs. + +DB is an isolated temp file; no network or subprocess. +""" + +import os +import tempfile + +os.environ.setdefault("ORCH_PLANE_API_TOKEN", "test-token") +os.environ.setdefault("ORCH_GITEA_TOKEN", "test-token") + +_test_db = os.path.join(tempfile.gettempdir(), "test_orchestrator_usage.db") +os.environ["ORCH_DB_PATH"] = _test_db + +import pytest # noqa: E402 + +from src import db as db_module # noqa: E402 +from src.db import init_db, get_db # noqa: E402 +from src import usage as U # noqa: E402 + + +# Real claude --output-format json result object (captured from CLI 2.1.142). +REAL_RESULT_JSON = ( + '{"type":"result","subtype":"success","is_error":false,"duration_ms":1795,' + '"num_turns":1,"result":"Hi!","session_id":"abc",' + '"total_cost_usd":0.0560175,' + '"usage":{"input_tokens":45231,"cache_creation_input_tokens":7418,' + '"cache_read_input_tokens":18500,"output_tokens":12100,' + '"service_tier":"standard"},' + '"modelUsage":{"claude-opus-4-7":{"inputTokens":6,"outputTokens":7}},' + '"permission_denials":[]}' +) + + +@pytest.fixture(autouse=True) +def setup_db(monkeypatch): + # get_db() reads settings.db_path live; pin it to our isolated DB. + monkeypatch.setattr(db_module.settings, "db_path", _test_db, raising=False) + if os.path.exists(_test_db): + os.unlink(_test_db) + init_db() + yield + if os.path.exists(_test_db): + os.unlink(_test_db) + + +# --------------------------------------------------------------------------- # +# parsing +# --------------------------------------------------------------------------- # +def test_parse_real_result_json(): + u = U.parse_usage_from_text(REAL_RESULT_JSON) + assert u is not None + assert u["input_tokens"] == 45231 + assert u["output_tokens"] == 12100 + assert u["cache_read_tokens"] == 18500 + assert abs(u["cost_usd"] - 0.0560175) < 1e-9 + + +def test_parse_with_leading_text(): + """The agent may print text before the trailing JSON; we still find it.""" + text = "some agent stdout line\nanother line\n" + REAL_RESULT_JSON + u = U.parse_usage_from_text(text) + assert u is not None + assert u["input_tokens"] == 45231 + assert u["output_tokens"] == 12100 + + +def test_parse_garbage_returns_none(): + assert U.parse_usage_from_text("not json at all { broken") is None + assert U.parse_usage_from_text("") is None + assert U.parse_usage_from_text(None) is None + + +def test_parse_json_without_usage_returns_none(): + assert U.parse_usage_from_text('{"hello":"world"}') is None + + +def test_parse_from_log_missing_file_returns_none(): + assert U.parse_usage_from_log("/no/such/file.log") is None + + +# --------------------------------------------------------------------------- # +# record_usage +# --------------------------------------------------------------------------- # +def _new_run(agent="developer", task_id=1): + conn = get_db() + cur = conn.execute("INSERT INTO agent_runs (task_id, agent) VALUES (?, ?)", (task_id, agent)) + rid = cur.lastrowid + conn.commit() + conn.close() + return rid + + +def test_record_usage_writes_columns(): + rid = _new_run() + u = U.parse_usage_from_text(REAL_RESULT_JSON) + U.record_usage(rid, u) + conn = get_db() + row = conn.execute( + "SELECT input_tokens, output_tokens, cache_read_tokens, cost_usd " + "FROM agent_runs WHERE id=?", (rid,) + ).fetchone() + conn.close() + assert row["input_tokens"] == 45231 + assert row["output_tokens"] == 12100 + assert row["cache_read_tokens"] == 18500 + assert abs(row["cost_usd"] - 0.0560175) < 1e-9 + + +def test_record_usage_none_writes_nulls(): + rid = _new_run() + U.record_usage(rid, None) # must not raise + conn = get_db() + row = conn.execute("SELECT input_tokens, cost_usd FROM agent_runs WHERE id=?", (rid,)).fetchone() + conn.close() + assert row["input_tokens"] is None + assert row["cost_usd"] is None + + +# --------------------------------------------------------------------------- # +# formatting +# --------------------------------------------------------------------------- # +def test_fmt_tokens(): + assert U.fmt_tokens(6) == "6" + assert U.fmt_tokens(1234) == "1.2k" + assert U.fmt_tokens(45231) == "45.2k" + assert U.fmt_tokens(2_500_000) == "2.5M" + assert U.fmt_tokens(None) == "0" + + +def test_fmt_cost(): + assert U.fmt_cost(0.21) == "$0.21" + assert U.fmt_cost(0.0560175) == "$0.06" + assert U.fmt_cost(None) == "$0.00" + + +def test_usage_comment_format(): + u = {"input_tokens": 45231, "output_tokens": 12100, "cost_usd": 0.21} + c = U.usage_comment("developer", u) + assert "Developer" in c + assert "45.2k in" in c + assert "12.1k out" in c + assert "$0.21" in c + + +# --------------------------------------------------------------------------- # +# task summary +# --------------------------------------------------------------------------- # +def test_task_summary_aggregates_over_agents(): + # two runs for the same task: developer + tester + for agent, ti, to, cost in [("developer", 1000, 200, 0.10), ("tester", 500, 100, 0.05)]: + rid = _new_run(agent=agent, task_id=42) + U.record_usage(rid, {"input_tokens": ti, "output_tokens": to, + "cache_read_tokens": 0, "cost_usd": cost}) + + s = U.task_usage_summary(42) + assert s["total_in"] == 1500 + assert s["total_out"] == 300 + assert abs(s["total_cost"] - 0.15) < 1e-9 + agents = {a for a, *_ in s["per_agent"]} + assert agents == {"developer", "tester"} + + comment = U.task_summary_comment(42) + assert "1.5k" in comment # total in + assert "$0.15" in comment # total cost + assert "Developer" in comment + assert "Tester" in comment diff --git a/tests/test_verdict_status.py b/tests/test_verdict_status.py new file mode 100644 index 0000000..4202d33 --- /dev/null +++ b/tests/test_verdict_status.py @@ -0,0 +1,140 @@ +"""Feature 2 (variant B): verdict statuses Approved / Rejected. + + * issue updated -> Approved : calls _try_advance_stage (== :approved: comment). + * issue updated -> Rejected : calls _rollback_stage (== :rejected: comment). + * the :approved: / :rejected: COMMENT mechanisms still work (both paths live). + +We mock the shared engine entry points (_try_advance_stage / _rollback_stage) +and assert they fire for both the status and the comment trigger, so the two +mechanisms are proven to funnel into the same logic. +""" + +import os +import tempfile + +_test_db = os.path.join(tempfile.gettempdir(), "test_orchestrator_verdict.db") +os.environ["ORCH_DB_PATH"] = _test_db +os.environ.setdefault("ORCH_PLANE_WEBHOOK_SECRET", "") +os.environ.setdefault("ORCH_GITEA_TOKEN", "test-token") +os.environ.setdefault("ORCH_PLANE_API_TOKEN", "test-token") + +import pytest # noqa: E402 +from unittest.mock import patch, AsyncMock # noqa: E402 +from fastapi.testclient import TestClient # noqa: E402 + +from src.main import app # noqa: E402 +from src.db import init_db, get_db # noqa: E402 +from src import projects as P # noqa: E402 +from src.projects import reload_projects # noqa: E402 + +ENDURO_PLANE_ID = "7a79f0a9-5278-49cd-9007-9a338f238f9c" +APPROVED = "a519a341-dada-4a91-8910-7604f82b79c5" +REJECTED = "ba958f3c-5db5-461d-8f82-89425e413b97" + +client = TestClient(app) + + +@pytest.fixture(autouse=True) +def setup(monkeypatch): + monkeypatch.setattr(P.settings, "db_path", _test_db) + import src.db as _db + monkeypatch.setattr(_db.settings, "db_path", _test_db) + if os.path.exists(_test_db): + os.unlink(_test_db) + init_db() + monkeypatch.setattr("src.webhooks.plane.verify_plane_signature", lambda body, sig: True) + registry_json = ( + f'[{{"plane_project_id": "{ENDURO_PLANE_ID}", "repo": "enduro-trails",' + f' "work_item_prefix": "ET", "name": "enduro-trails"}}]' + ) + monkeypatch.setattr(P.settings, "projects_json", registry_json) + reload_projects() + # Seed a task at the 'review' stage for plane_id 'v-1'. + conn = get_db() + conn.execute( + "INSERT INTO tasks (plane_id, work_item_id, repo, branch, stage, plane_issue_id) " + "VALUES (?, ?, ?, ?, ?, ?)", + ("v-1", "ET-500", "enduro-trails", "feature/ET-500-x", "review", "v-1"), + ) + conn.commit() + conn.close() + yield + reload_projects() + if os.path.exists(_test_db): + os.unlink(_test_db) + + +def _status(state_id, plane_id="v-1", old="prev"): + return client.post("/webhook/plane", json={ + "event": "issue", "action": "updated", + "data": { + "id": plane_id, "name": "Verdict task", "project": ENDURO_PLANE_ID, + "state": {"id": state_id, "name": "X", "group": "started"}, + }, + "activity": {"field": "state", "new_value": state_id, "old_value": old}, + }) + + +def _comment(text, plane_id="v-1"): + return client.post("/webhook/plane", json={ + "event": "issue_comment", "action": "created", + "data": {"work_item_id": plane_id, "comment_stripped": text, + "project": ENDURO_PLANE_ID}, + }) + + +# --------------------------------------------------------------------------- # +# Approved status -> advance +# --------------------------------------------------------------------------- # +@patch("src.plane_sync.set_issue_in_progress") +@patch("src.webhooks.plane._try_advance_stage", new_callable=AsyncMock) +def test_approved_status_advances(mock_advance, mock_sip): + resp = _status(APPROVED) + assert resp.status_code == 200 + mock_advance.assert_awaited_once() + # advanced the right task (ET-500 at review) + args = mock_advance.call_args.args + assert "ET-500" in args # work_item_id is passed positionally + + +@patch("src.plane_sync.set_issue_in_progress") +@patch("src.webhooks.plane._try_advance_stage", new_callable=AsyncMock) +def test_approved_comment_still_advances(mock_advance, mock_sip): + resp = _comment(":approved:") + assert resp.status_code == 200 + mock_advance.assert_awaited_once() + + +# --------------------------------------------------------------------------- # +# Rejected status -> rollback +# --------------------------------------------------------------------------- # +@patch("src.webhooks.plane._rollback_stage", new_callable=AsyncMock) +def test_rejected_status_rolls_back(mock_rollback): + resp = _status(REJECTED) + assert resp.status_code == 200 + mock_rollback.assert_awaited_once() + # reason note for a status reject (no inline reason available) + kwargs_reason = mock_rollback.call_args.args[-1] + assert "rejected via status" in kwargs_reason + + +@patch("src.webhooks.plane._rollback_stage", new_callable=AsyncMock) +def test_rejected_comment_still_rolls_back(mock_rollback): + resp = _comment(":rejected: bad ADR") + assert resp.status_code == 200 + mock_rollback.assert_awaited_once() + reason = mock_rollback.call_args.args[-1] + assert "bad ADR" in reason + + +# --------------------------------------------------------------------------- # +# Unknown verdict status -> no-op +# --------------------------------------------------------------------------- # +@patch("src.webhooks.plane._rollback_stage", new_callable=AsyncMock) +@patch("src.webhooks.plane._try_advance_stage", new_callable=AsyncMock) +def test_other_status_no_verdict_action(mock_advance, mock_rollback): + # In Review status is not a verdict -> neither advance nor rollback. + resp = _status("38fb1f64-aa1e-48a3-92e0-0b109679046b") # in_review + assert resp.status_code == 200 + mock_advance.assert_not_called() + mock_rollback.assert_not_called() diff --git a/tests/test_webhook_dedup.py b/tests/test_webhook_dedup.py index 94f79e4..7fcb987 100644 --- a/tests/test_webhook_dedup.py +++ b/tests/test_webhook_dedup.py @@ -211,14 +211,21 @@ def test_gitea_fallback_hash_when_no_delivery_header(): @patch("src.webhooks.plane._create_gitea_branch", new_callable=AsyncMock) @patch("src.webhooks.plane._create_initial_docs", new_callable=AsyncMock) def test_plane_fallback_hash_dedup(mock_docs, mock_branch, mock_enqueue): - """Repeated identical Plane body -> first accepted+enqueue, repeat duplicate.""" + """Repeated identical Plane body -> first accepted+enqueue, repeat duplicate. + + Feature 1: the pipeline now starts on a status change to In Progress, not on + creation, so this drives the dedup test with an 'issue updated' event. + """ + IN_PROGRESS = "b873d9eb-993c-48cd-97ac-99a9b1623967" body = { - "event": "work_item.created", + "event": "issue", + "action": "updated", "data": { "id": "pd-001", "name": "Dedup plane task", "description_stripped": "A sufficiently long description for QG-0 to pass.", "project": "proj-1", + "state": {"id": IN_PROGRESS, "name": "In Progress", "group": "started"}, }, } r1 = client.post("/webhook/plane", json=body)