diff --git a/src/agents/launcher.py b/src/agents/launcher.py index 75b7cb8..dd2be7a 100644 --- a/src/agents/launcher.py +++ b/src/agents/launcher.py @@ -209,9 +209,15 @@ class AgentLauncher: # No git fetch/checkout here: ensure_worktree() already put the worktree on # the right branch. The agent simply runs inside its isolated work_path. + # Feature 4 (token usage): --output-format json makes claude emit a single + # result JSON (with usage + total_cost_usd) at the end of stdout. The log + # still captures it; _monitor_agent parses the trailing JSON after the run + # to record per-agent tokens/cost. _monitor_agent's failure handling keys + # off the process exit_code (not stdout shape), so this is safe. cmd = ( f'cd {work_path} && ' f'{self.CLAUDE_BIN} --print ' + f'--output-format json ' f'{model_flag}' f'"$(cat {task_file})" ' f'--system-prompt "$(cat {system_prompt})" ' @@ -400,6 +406,17 @@ class AgentLauncher: notify_agent_finished(run_id, agent, exit_code, task_id=_task_id, duration_s=_duration_s) + # Feature 4: parse token usage / cost from the (json) run log and record + # it on the agent_runs row. Never fatal — a garbled/missing JSON records + # NULLs and logs a warning so a broken run can't crash the monitor. + try: + from ..usage import parse_usage_from_log, record_usage + _usage = parse_usage_from_log(output_path) if output_path else None + record_usage(run_id, _usage) + except Exception as e: + logger.warning(f"run_id={run_id}: usage accounting failed: {e}") + _usage = None + # Commit and push any changes — in the per-branch worktree (ORCH-2 / S-4), # NOT in the shared /repos/. The worktree is already on `branch` # (ensure_worktree did the checkout), so no checkout is needed here. @@ -490,6 +507,14 @@ class AgentLauncher: from ..notifications import send_telegram send_telegram(f"\u26a0\ufe0f {_wid}: Agent {agent} failed (exit_code={exit_code}). Check logs: /app/data/runs/{run_id}.log") + # Feature 4: post the per-agent usage comment under that agent's bot, and + # — for the deployer finishing the task — the per-task usage summary. + if exit_code == 0: + try: + self._post_usage_comments(run_id, agent, repo, branch, _usage) + except Exception as e: + logger.warning(f"run_id={run_id}: usage comment failed: {e}") + # Auto-advance stage if agent finished successfully and QG passes if exit_code == 0: self._try_advance_stage(run_id, agent, repo, branch) @@ -654,6 +679,32 @@ class AgentLauncher: logger.error(f"Auto-advance failed for run_id={run_id}: {e}") + def _post_usage_comments(self, run_id, agent, repo, branch, usage): + """Feature 4: post the per-agent usage comment (and Deployer summary). + + - Always (on success, with a work_item_id): a per-agent finish comment + with token/cost, authored by the finishing agent's Plane bot. + - When the deployer finishes: also a per-task summary (SUM over + agent_runs GROUP BY agent), authored by the deployer. + """ + from ..usage import usage_comment, task_summary_comment + conn = get_db() + row = conn.execute( + "SELECT id, work_item_id FROM tasks WHERE repo=? AND branch=?", + (repo, branch), + ).fetchone() + conn.close() + if not row: + return + task_id, work_item_id = row[0], row[1] + if not work_item_id: + return + plane_add_comment(work_item_id, usage_comment(agent, usage), author=agent) + if agent == "deployer": + plane_add_comment( + work_item_id, task_summary_comment(task_id), author="deployer" + ) + def _ensure_pr(self, repo: str, branch: str, run_id: int): import httpx owner = settings.gitea_owner diff --git a/src/db.py b/src/db.py index 0b77610..f8bfa9d 100644 --- a/src/db.py +++ b/src/db.py @@ -77,6 +77,13 @@ def init_db(): "CREATE UNIQUE INDEX IF NOT EXISTS idx_events_delivery " "ON events(delivery_id) WHERE delivery_id IS NOT NULL" ) + # Feature 4 (token usage): per-run token / cost accounting. Parsed from the + # claude --output-format json result by the launcher monitor. Idempotent + # ALTERs (no-op once the columns exist) so this is safe on the live prod DB. + _ensure_column(conn, "agent_runs", "input_tokens", "INTEGER") + _ensure_column(conn, "agent_runs", "output_tokens", "INTEGER") + _ensure_column(conn, "agent_runs", "cache_read_tokens", "INTEGER") + _ensure_column(conn, "agent_runs", "cost_usd", "REAL") conn.commit() conn.close() diff --git a/src/usage.py b/src/usage.py new file mode 100644 index 0000000..8968381 --- /dev/null +++ b/src/usage.py @@ -0,0 +1,268 @@ +"""Feature 4: token / cost accounting for agent runs. + +claude --output-format json emits a single result JSON object at the end of the +run log with fields: + total_cost_usd + usage.input_tokens / output_tokens / cache_read_input_tokens / + cache_creation_input_tokens + modelUsage, num_turns, duration_ms + +This module parses that JSON out of a (text-or-json) run log, records the usage +on the agent_runs row, formats a Plane comment for the finishing agent, and +builds the per-task summary the Deployer posts on deploy/done. + +Everything here is defensive: a missing/garbled JSON never raises \u2014 we record +NULL/0 and log a warning so a broken agent run can't crash the monitor. +""" + +import json +import logging + +from .db import get_db + +logger = logging.getLogger("orchestrator.usage") + + +def parse_usage_from_text(text: str) -> dict | None: + """Extract the claude result-JSON usage from a run log's text. + + The log may contain plain text before/after the JSON; with + --output-format json the JSON is the final object. We scan for the LAST + top-level '{' ... '}' that parses and carries usage/total_cost_usd. + + Returns a normalised dict + {input_tokens, output_tokens, cache_read_tokens, cost_usd} + (ints / float, missing fields -> 0 / 0.0), or None if no usable JSON found. + """ + if not text: + return None + + candidate = _extract_last_json_object(text) + if candidate is None: + return None + + usage = candidate.get("usage") or {} + if not isinstance(usage, dict): + usage = {} + + cost = candidate.get("total_cost_usd") + if cost is None: + cost = candidate.get("cost_usd") + + # If there is neither a usage block nor a cost, this isn't a result object. + if not usage and cost is None: + return None + + def _int(v): + try: + return int(v) + except (TypeError, ValueError): + return 0 + + def _float(v): + try: + return float(v) + except (TypeError, ValueError): + return 0.0 + + return { + "input_tokens": _int(usage.get("input_tokens")), + "output_tokens": _int(usage.get("output_tokens")), + "cache_read_tokens": _int( + usage.get("cache_read_input_tokens", usage.get("cache_read_tokens")) + ), + "cost_usd": _float(cost), + } + + +def _extract_last_json_object(text: str) -> dict | None: + """Return the last balanced top-level JSON object in `text` that parses. + + Scans from the end for '}' and walks back to the matching '{' using a depth + counter (string-aware), trying json.loads on each candidate. Robust to log + lines or text emitted before the JSON. + """ + # Fast path: the whole stripped text is the JSON. + stripped = text.strip() + try: + obj = json.loads(stripped) + if isinstance(obj, dict): + return obj + except (ValueError, TypeError): + pass + + # Otherwise find the last balanced { ... } block. + end = len(text) + while True: + close = text.rfind("}", 0, end) + if close == -1: + return None + depth = 0 + in_str = False + esc = False + start = None + for i in range(close, -1, -1): + ch = text[i] + if in_str: + if esc: + esc = False + elif ch == "\\": + esc = True + elif ch == '"': + in_str = False + continue + if ch == '"': + in_str = True + elif ch == "}": + depth += 1 + elif ch == "{": + depth -= 1 + if depth == 0: + start = i + break + if start is not None: + blob = text[start:close + 1] + try: + obj = json.loads(blob) + if isinstance(obj, dict): + return obj + except (ValueError, TypeError): + pass + end = close # keep scanning earlier in the text + + +def parse_usage_from_log(path: str) -> dict | None: + """Read a run log file and parse usage from it. Never raises.""" + try: + with open(path, "r", encoding="utf-8", errors="replace") as f: + return parse_usage_from_text(f.read()) + except OSError as e: + logger.warning(f"parse_usage_from_log: cannot read {path}: {e}") + return None + + +def record_usage(run_id: int, usage: dict | None): + """Write parsed usage onto the agent_runs row. NULLs if usage is None.""" + if usage is None: + logger.warning(f"run_id={run_id}: no usage JSON parsed, recording NULLs") + usage = {} + conn = get_db() + try: + conn.execute( + "UPDATE agent_runs SET input_tokens=?, output_tokens=?, " + "cache_read_tokens=?, cost_usd=? WHERE id=?", + ( + usage.get("input_tokens"), + usage.get("output_tokens"), + usage.get("cache_read_tokens"), + usage.get("cost_usd"), + run_id, + ), + ) + conn.commit() + finally: + conn.close() + + +def fmt_tokens(n) -> str: + """Format a token count compactly: 1234 -> '1.2k', 2_500_000 -> '2.5M'.""" + try: + n = int(n or 0) + except (TypeError, ValueError): + n = 0 + if n >= 1_000_000: + return f"{n / 1_000_000:.1f}M" + if n >= 1_000: + return f"{n / 1_000:.1f}k" + return str(n) + + +def fmt_cost(c) -> str: + """Format USD cost with 2 decimals: '$0.21'.""" + try: + c = float(c or 0.0) + except (TypeError, ValueError): + c = 0.0 + return f"${c:.2f}" + + +# Pretty agent names for comments (mirrors STAGE_AUTHORS roles). +AGENT_DISPLAY = { + "analyst": "Analyst", + "architect": "Architect", + "developer": "Developer", + "reviewer": "Reviewer", + "tester": "Tester", + "deployer": "Deployer", +} + + +def usage_comment(agent: str, usage: dict | None) -> str: + """Build the per-agent finish comment, e.g. + '\U0001f4bb Developer \u0433\u043e\u0442\u043e\u0432 \u00b7 45.2k in / 12.1k out \u00b7 $0.21'. + """ + usage = usage or {} + name = AGENT_DISPLAY.get(agent, agent.capitalize()) + icon = AGENT_ICON.get(agent, "\u2705") + return ( + f"{icon} {name} \u0433\u043e\u0442\u043e\u0432 \u00b7 " + f"{fmt_tokens(usage.get('input_tokens'))} in / " + f"{fmt_tokens(usage.get('output_tokens'))} out \u00b7 " + f"{fmt_cost(usage.get('cost_usd'))}" + ) + + +AGENT_ICON = { + "analyst": "\U0001f50d", + "architect": "\U0001f4d0", + "developer": "\U0001f4bb", + "reviewer": "\U0001f50e", + "tester": "\U0001f9ea", + "deployer": "\U0001f680", +} + + +def task_usage_summary(task_id: int) -> dict: + """Aggregate agent_runs usage for a task. + + Returns {total_in, total_out, total_cost, per_agent: [(agent, in, out, cost), ...]}. + """ + conn = get_db() + try: + rows = conn.execute( + "SELECT agent, " + "COALESCE(SUM(input_tokens),0), " + "COALESCE(SUM(output_tokens),0), " + "COALESCE(SUM(cost_usd),0.0) " + "FROM agent_runs WHERE task_id=? GROUP BY agent ORDER BY agent", + (task_id,), + ).fetchall() + finally: + conn.close() + per_agent = [(r[0], int(r[1]), int(r[2]), float(r[3])) for r in rows] + total_in = sum(r[1] for r in per_agent) + total_out = sum(r[2] for r in per_agent) + total_cost = sum(r[3] for r in per_agent) + return { + "total_in": total_in, + "total_out": total_out, + "total_cost": total_cost, + "per_agent": per_agent, + } + + +def task_summary_comment(task_id: int) -> str: + """Build the Deployer end-of-task summary comment (Feature 4, variant B).""" + s = task_usage_summary(task_id) + lines = [ + f"\U0001f4ca \u0418\u0442\u043e\u0433\u043e \u043f\u043e \u0437\u0430\u0434\u0430\u0447\u0435: " + f"{fmt_tokens(s['total_in'])} \u0442\u043e\u043a\u0435\u043d\u043e\u0432 \u0432\u0445\u043e\u0434 / " + f"{fmt_tokens(s['total_out'])} \u0432\u044b\u0445\u043e\u0434 \u00b7 " + f"{fmt_cost(s['total_cost'])}" + ] + for agent, ti, to, cost in s["per_agent"]: + name = AGENT_DISPLAY.get(agent, agent.capitalize()) + lines.append( + f"\u2022 {name}: {fmt_tokens(ti)} in / {fmt_tokens(to)} out \u00b7 {fmt_cost(cost)}" + ) + return "\n".join(lines) diff --git a/tests/test_usage.py b/tests/test_usage.py new file mode 100644 index 0000000..59d059b --- /dev/null +++ b/tests/test_usage.py @@ -0,0 +1,176 @@ +"""Feature 4: token / cost accounting tests. + +Covers: + * parse_usage_from_text on a REAL claude --output-format json result blob + (captured live from CLI 2.1.142), including a leading text line. + * parse on garbage / missing JSON -> None (never raises). + * record_usage writes the columns; NULLs when usage is None. + * fmt_tokens / fmt_cost formatting. + * usage_comment string format. + * task_usage_summary / task_summary_comment aggregate over agent_runs. + +DB is an isolated temp file; no network or subprocess. +""" + +import os +import tempfile + +os.environ.setdefault("ORCH_PLANE_API_TOKEN", "test-token") +os.environ.setdefault("ORCH_GITEA_TOKEN", "test-token") + +_test_db = os.path.join(tempfile.gettempdir(), "test_orchestrator_usage.db") +os.environ["ORCH_DB_PATH"] = _test_db + +import pytest # noqa: E402 + +from src import db as db_module # noqa: E402 +from src.db import init_db, get_db # noqa: E402 +from src import usage as U # noqa: E402 + + +# Real claude --output-format json result object (captured from CLI 2.1.142). +REAL_RESULT_JSON = ( + '{"type":"result","subtype":"success","is_error":false,"duration_ms":1795,' + '"num_turns":1,"result":"Hi!","session_id":"abc",' + '"total_cost_usd":0.0560175,' + '"usage":{"input_tokens":45231,"cache_creation_input_tokens":7418,' + '"cache_read_input_tokens":18500,"output_tokens":12100,' + '"service_tier":"standard"},' + '"modelUsage":{"claude-opus-4-7":{"inputTokens":6,"outputTokens":7}},' + '"permission_denials":[]}' +) + + +@pytest.fixture(autouse=True) +def setup_db(monkeypatch): + # get_db() reads settings.db_path live; pin it to our isolated DB. + monkeypatch.setattr(db_module.settings, "db_path", _test_db, raising=False) + if os.path.exists(_test_db): + os.unlink(_test_db) + init_db() + yield + if os.path.exists(_test_db): + os.unlink(_test_db) + + +# --------------------------------------------------------------------------- # +# parsing +# --------------------------------------------------------------------------- # +def test_parse_real_result_json(): + u = U.parse_usage_from_text(REAL_RESULT_JSON) + assert u is not None + assert u["input_tokens"] == 45231 + assert u["output_tokens"] == 12100 + assert u["cache_read_tokens"] == 18500 + assert abs(u["cost_usd"] - 0.0560175) < 1e-9 + + +def test_parse_with_leading_text(): + """The agent may print text before the trailing JSON; we still find it.""" + text = "some agent stdout line\nanother line\n" + REAL_RESULT_JSON + u = U.parse_usage_from_text(text) + assert u is not None + assert u["input_tokens"] == 45231 + assert u["output_tokens"] == 12100 + + +def test_parse_garbage_returns_none(): + assert U.parse_usage_from_text("not json at all { broken") is None + assert U.parse_usage_from_text("") is None + assert U.parse_usage_from_text(None) is None + + +def test_parse_json_without_usage_returns_none(): + assert U.parse_usage_from_text('{"hello":"world"}') is None + + +def test_parse_from_log_missing_file_returns_none(): + assert U.parse_usage_from_log("/no/such/file.log") is None + + +# --------------------------------------------------------------------------- # +# record_usage +# --------------------------------------------------------------------------- # +def _new_run(agent="developer", task_id=1): + conn = get_db() + cur = conn.execute("INSERT INTO agent_runs (task_id, agent) VALUES (?, ?)", (task_id, agent)) + rid = cur.lastrowid + conn.commit() + conn.close() + return rid + + +def test_record_usage_writes_columns(): + rid = _new_run() + u = U.parse_usage_from_text(REAL_RESULT_JSON) + U.record_usage(rid, u) + conn = get_db() + row = conn.execute( + "SELECT input_tokens, output_tokens, cache_read_tokens, cost_usd " + "FROM agent_runs WHERE id=?", (rid,) + ).fetchone() + conn.close() + assert row["input_tokens"] == 45231 + assert row["output_tokens"] == 12100 + assert row["cache_read_tokens"] == 18500 + assert abs(row["cost_usd"] - 0.0560175) < 1e-9 + + +def test_record_usage_none_writes_nulls(): + rid = _new_run() + U.record_usage(rid, None) # must not raise + conn = get_db() + row = conn.execute("SELECT input_tokens, cost_usd FROM agent_runs WHERE id=?", (rid,)).fetchone() + conn.close() + assert row["input_tokens"] is None + assert row["cost_usd"] is None + + +# --------------------------------------------------------------------------- # +# formatting +# --------------------------------------------------------------------------- # +def test_fmt_tokens(): + assert U.fmt_tokens(6) == "6" + assert U.fmt_tokens(1234) == "1.2k" + assert U.fmt_tokens(45231) == "45.2k" + assert U.fmt_tokens(2_500_000) == "2.5M" + assert U.fmt_tokens(None) == "0" + + +def test_fmt_cost(): + assert U.fmt_cost(0.21) == "$0.21" + assert U.fmt_cost(0.0560175) == "$0.06" + assert U.fmt_cost(None) == "$0.00" + + +def test_usage_comment_format(): + u = {"input_tokens": 45231, "output_tokens": 12100, "cost_usd": 0.21} + c = U.usage_comment("developer", u) + assert "Developer" in c + assert "45.2k in" in c + assert "12.1k out" in c + assert "$0.21" in c + + +# --------------------------------------------------------------------------- # +# task summary +# --------------------------------------------------------------------------- # +def test_task_summary_aggregates_over_agents(): + # two runs for the same task: developer + tester + for agent, ti, to, cost in [("developer", 1000, 200, 0.10), ("tester", 500, 100, 0.05)]: + rid = _new_run(agent=agent, task_id=42) + U.record_usage(rid, {"input_tokens": ti, "output_tokens": to, + "cache_read_tokens": 0, "cost_usd": cost}) + + s = U.task_usage_summary(42) + assert s["total_in"] == 1500 + assert s["total_out"] == 300 + assert abs(s["total_cost"] - 0.15) < 1e-9 + agents = {a for a, *_ in s["per_agent"]} + assert agents == {"developer", "tester"} + + comment = U.task_summary_comment(42) + assert "1.5k" in comment # total in + assert "$0.15" in comment # total cost + assert "Developer" in comment + assert "Tester" in comment