feat(metrics): per-agent token/cost accounting

Feature 4. claude is now launched with --output-format json; the run-log trailing
result JSON is parsed (defensively, never fatal) for usage + total_cost_usd. New
idempotent ALTERs add input_tokens/output_tokens/cache_read_tokens/cost_usd to
agent_runs; the launcher monitor records usage per run, posts a per-agent finish
comment under that agent bot (e.g. Developer gotov · 45.2k in / 12.1k out · $0.21),
and the deployer posts an end-of-task summary (SUM over agent_runs GROUP BY agent)
on done. New src/usage.py holds parse/format/record/summary helpers; test_usage.py
covers parsing a real CLI JSON blob, NULL-on-garbage, recording, formatting, and the
per-task aggregate.
This commit is contained in:
Dev Agent
2026-06-03 18:18:46 +03:00
parent 38a741d24e
commit 9a702a0216
4 changed files with 502 additions and 0 deletions

View File

@@ -209,9 +209,15 @@ class AgentLauncher:
# No git fetch/checkout here: ensure_worktree() already put the worktree on
# the right branch. The agent simply runs inside its isolated work_path.
# Feature 4 (token usage): --output-format json makes claude emit a single
# result JSON (with usage + total_cost_usd) at the end of stdout. The log
# still captures it; _monitor_agent parses the trailing JSON after the run
# to record per-agent tokens/cost. _monitor_agent's failure handling keys
# off the process exit_code (not stdout shape), so this is safe.
cmd = (
f'cd {work_path} && '
f'{self.CLAUDE_BIN} --print '
f'--output-format json '
f'{model_flag}'
f'"$(cat {task_file})" '
f'--system-prompt "$(cat {system_prompt})" '
@@ -400,6 +406,17 @@ class AgentLauncher:
notify_agent_finished(run_id, agent, exit_code, task_id=_task_id, duration_s=_duration_s)
# Feature 4: parse token usage / cost from the (json) run log and record
# it on the agent_runs row. Never fatal — a garbled/missing JSON records
# NULLs and logs a warning so a broken run can't crash the monitor.
try:
from ..usage import parse_usage_from_log, record_usage
_usage = parse_usage_from_log(output_path) if output_path else None
record_usage(run_id, _usage)
except Exception as e:
logger.warning(f"run_id={run_id}: usage accounting failed: {e}")
_usage = None
# Commit and push any changes — in the per-branch worktree (ORCH-2 / S-4),
# NOT in the shared /repos/<repo>. The worktree is already on `branch`
# (ensure_worktree did the checkout), so no checkout is needed here.
@@ -490,6 +507,14 @@ class AgentLauncher:
from ..notifications import send_telegram
send_telegram(f"\u26a0\ufe0f {_wid}: Agent {agent} failed (exit_code={exit_code}). Check logs: /app/data/runs/{run_id}.log")
# Feature 4: post the per-agent usage comment under that agent's bot, and
# — for the deployer finishing the task — the per-task usage summary.
if exit_code == 0:
try:
self._post_usage_comments(run_id, agent, repo, branch, _usage)
except Exception as e:
logger.warning(f"run_id={run_id}: usage comment failed: {e}")
# Auto-advance stage if agent finished successfully and QG passes
if exit_code == 0:
self._try_advance_stage(run_id, agent, repo, branch)
@@ -654,6 +679,32 @@ class AgentLauncher:
logger.error(f"Auto-advance failed for run_id={run_id}: {e}")
def _post_usage_comments(self, run_id, agent, repo, branch, usage):
"""Feature 4: post the per-agent usage comment (and Deployer summary).
- Always (on success, with a work_item_id): a per-agent finish comment
with token/cost, authored by the finishing agent's Plane bot.
- When the deployer finishes: also a per-task summary (SUM over
agent_runs GROUP BY agent), authored by the deployer.
"""
from ..usage import usage_comment, task_summary_comment
conn = get_db()
row = conn.execute(
"SELECT id, work_item_id FROM tasks WHERE repo=? AND branch=?",
(repo, branch),
).fetchone()
conn.close()
if not row:
return
task_id, work_item_id = row[0], row[1]
if not work_item_id:
return
plane_add_comment(work_item_id, usage_comment(agent, usage), author=agent)
if agent == "deployer":
plane_add_comment(
work_item_id, task_summary_comment(task_id), author="deployer"
)
def _ensure_pr(self, repo: str, branch: str, run_id: int):
import httpx
owner = settings.gitea_owner

View File

@@ -77,6 +77,13 @@ def init_db():
"CREATE UNIQUE INDEX IF NOT EXISTS idx_events_delivery "
"ON events(delivery_id) WHERE delivery_id IS NOT NULL"
)
# Feature 4 (token usage): per-run token / cost accounting. Parsed from the
# claude --output-format json result by the launcher monitor. Idempotent
# ALTERs (no-op once the columns exist) so this is safe on the live prod DB.
_ensure_column(conn, "agent_runs", "input_tokens", "INTEGER")
_ensure_column(conn, "agent_runs", "output_tokens", "INTEGER")
_ensure_column(conn, "agent_runs", "cache_read_tokens", "INTEGER")
_ensure_column(conn, "agent_runs", "cost_usd", "REAL")
conn.commit()
conn.close()

268
src/usage.py Normal file
View File

@@ -0,0 +1,268 @@
"""Feature 4: token / cost accounting for agent runs.
claude --output-format json emits a single result JSON object at the end of the
run log with fields:
total_cost_usd
usage.input_tokens / output_tokens / cache_read_input_tokens /
cache_creation_input_tokens
modelUsage, num_turns, duration_ms
This module parses that JSON out of a (text-or-json) run log, records the usage
on the agent_runs row, formats a Plane comment for the finishing agent, and
builds the per-task summary the Deployer posts on deploy/done.
Everything here is defensive: a missing/garbled JSON never raises \u2014 we record
NULL/0 and log a warning so a broken agent run can't crash the monitor.
"""
import json
import logging
from .db import get_db
logger = logging.getLogger("orchestrator.usage")
def parse_usage_from_text(text: str) -> dict | None:
"""Extract the claude result-JSON usage from a run log's text.
The log may contain plain text before/after the JSON; with
--output-format json the JSON is the final object. We scan for the LAST
top-level '{' ... '}' that parses and carries usage/total_cost_usd.
Returns a normalised dict
{input_tokens, output_tokens, cache_read_tokens, cost_usd}
(ints / float, missing fields -> 0 / 0.0), or None if no usable JSON found.
"""
if not text:
return None
candidate = _extract_last_json_object(text)
if candidate is None:
return None
usage = candidate.get("usage") or {}
if not isinstance(usage, dict):
usage = {}
cost = candidate.get("total_cost_usd")
if cost is None:
cost = candidate.get("cost_usd")
# If there is neither a usage block nor a cost, this isn't a result object.
if not usage and cost is None:
return None
def _int(v):
try:
return int(v)
except (TypeError, ValueError):
return 0
def _float(v):
try:
return float(v)
except (TypeError, ValueError):
return 0.0
return {
"input_tokens": _int(usage.get("input_tokens")),
"output_tokens": _int(usage.get("output_tokens")),
"cache_read_tokens": _int(
usage.get("cache_read_input_tokens", usage.get("cache_read_tokens"))
),
"cost_usd": _float(cost),
}
def _extract_last_json_object(text: str) -> dict | None:
"""Return the last balanced top-level JSON object in `text` that parses.
Scans from the end for '}' and walks back to the matching '{' using a depth
counter (string-aware), trying json.loads on each candidate. Robust to log
lines or text emitted before the JSON.
"""
# Fast path: the whole stripped text is the JSON.
stripped = text.strip()
try:
obj = json.loads(stripped)
if isinstance(obj, dict):
return obj
except (ValueError, TypeError):
pass
# Otherwise find the last balanced { ... } block.
end = len(text)
while True:
close = text.rfind("}", 0, end)
if close == -1:
return None
depth = 0
in_str = False
esc = False
start = None
for i in range(close, -1, -1):
ch = text[i]
if in_str:
if esc:
esc = False
elif ch == "\\":
esc = True
elif ch == '"':
in_str = False
continue
if ch == '"':
in_str = True
elif ch == "}":
depth += 1
elif ch == "{":
depth -= 1
if depth == 0:
start = i
break
if start is not None:
blob = text[start:close + 1]
try:
obj = json.loads(blob)
if isinstance(obj, dict):
return obj
except (ValueError, TypeError):
pass
end = close # keep scanning earlier in the text
def parse_usage_from_log(path: str) -> dict | None:
"""Read a run log file and parse usage from it. Never raises."""
try:
with open(path, "r", encoding="utf-8", errors="replace") as f:
return parse_usage_from_text(f.read())
except OSError as e:
logger.warning(f"parse_usage_from_log: cannot read {path}: {e}")
return None
def record_usage(run_id: int, usage: dict | None):
"""Write parsed usage onto the agent_runs row. NULLs if usage is None."""
if usage is None:
logger.warning(f"run_id={run_id}: no usage JSON parsed, recording NULLs")
usage = {}
conn = get_db()
try:
conn.execute(
"UPDATE agent_runs SET input_tokens=?, output_tokens=?, "
"cache_read_tokens=?, cost_usd=? WHERE id=?",
(
usage.get("input_tokens"),
usage.get("output_tokens"),
usage.get("cache_read_tokens"),
usage.get("cost_usd"),
run_id,
),
)
conn.commit()
finally:
conn.close()
def fmt_tokens(n) -> str:
"""Format a token count compactly: 1234 -> '1.2k', 2_500_000 -> '2.5M'."""
try:
n = int(n or 0)
except (TypeError, ValueError):
n = 0
if n >= 1_000_000:
return f"{n / 1_000_000:.1f}M"
if n >= 1_000:
return f"{n / 1_000:.1f}k"
return str(n)
def fmt_cost(c) -> str:
"""Format USD cost with 2 decimals: '$0.21'."""
try:
c = float(c or 0.0)
except (TypeError, ValueError):
c = 0.0
return f"${c:.2f}"
# Pretty agent names for comments (mirrors STAGE_AUTHORS roles).
AGENT_DISPLAY = {
"analyst": "Analyst",
"architect": "Architect",
"developer": "Developer",
"reviewer": "Reviewer",
"tester": "Tester",
"deployer": "Deployer",
}
def usage_comment(agent: str, usage: dict | None) -> str:
"""Build the per-agent finish comment, e.g.
'\U0001f4bb Developer \u0433\u043e\u0442\u043e\u0432 \u00b7 45.2k in / 12.1k out \u00b7 $0.21'.
"""
usage = usage or {}
name = AGENT_DISPLAY.get(agent, agent.capitalize())
icon = AGENT_ICON.get(agent, "\u2705")
return (
f"{icon} {name} \u0433\u043e\u0442\u043e\u0432 \u00b7 "
f"{fmt_tokens(usage.get('input_tokens'))} in / "
f"{fmt_tokens(usage.get('output_tokens'))} out \u00b7 "
f"{fmt_cost(usage.get('cost_usd'))}"
)
AGENT_ICON = {
"analyst": "\U0001f50d",
"architect": "\U0001f4d0",
"developer": "\U0001f4bb",
"reviewer": "\U0001f50e",
"tester": "\U0001f9ea",
"deployer": "\U0001f680",
}
def task_usage_summary(task_id: int) -> dict:
"""Aggregate agent_runs usage for a task.
Returns {total_in, total_out, total_cost, per_agent: [(agent, in, out, cost), ...]}.
"""
conn = get_db()
try:
rows = conn.execute(
"SELECT agent, "
"COALESCE(SUM(input_tokens),0), "
"COALESCE(SUM(output_tokens),0), "
"COALESCE(SUM(cost_usd),0.0) "
"FROM agent_runs WHERE task_id=? GROUP BY agent ORDER BY agent",
(task_id,),
).fetchall()
finally:
conn.close()
per_agent = [(r[0], int(r[1]), int(r[2]), float(r[3])) for r in rows]
total_in = sum(r[1] for r in per_agent)
total_out = sum(r[2] for r in per_agent)
total_cost = sum(r[3] for r in per_agent)
return {
"total_in": total_in,
"total_out": total_out,
"total_cost": total_cost,
"per_agent": per_agent,
}
def task_summary_comment(task_id: int) -> str:
"""Build the Deployer end-of-task summary comment (Feature 4, variant B)."""
s = task_usage_summary(task_id)
lines = [
f"\U0001f4ca \u0418\u0442\u043e\u0433\u043e \u043f\u043e \u0437\u0430\u0434\u0430\u0447\u0435: "
f"{fmt_tokens(s['total_in'])} \u0442\u043e\u043a\u0435\u043d\u043e\u0432 \u0432\u0445\u043e\u0434 / "
f"{fmt_tokens(s['total_out'])} \u0432\u044b\u0445\u043e\u0434 \u00b7 "
f"{fmt_cost(s['total_cost'])}"
]
for agent, ti, to, cost in s["per_agent"]:
name = AGENT_DISPLAY.get(agent, agent.capitalize())
lines.append(
f"\u2022 {name}: {fmt_tokens(ti)} in / {fmt_tokens(to)} out \u00b7 {fmt_cost(cost)}"
)
return "\n".join(lines)

176
tests/test_usage.py Normal file
View File

@@ -0,0 +1,176 @@
"""Feature 4: token / cost accounting tests.
Covers:
* parse_usage_from_text on a REAL claude --output-format json result blob
(captured live from CLI 2.1.142), including a leading text line.
* parse on garbage / missing JSON -> None (never raises).
* record_usage writes the columns; NULLs when usage is None.
* fmt_tokens / fmt_cost formatting.
* usage_comment string format.
* task_usage_summary / task_summary_comment aggregate over agent_runs.
DB is an isolated temp file; no network or subprocess.
"""
import os
import tempfile
os.environ.setdefault("ORCH_PLANE_API_TOKEN", "test-token")
os.environ.setdefault("ORCH_GITEA_TOKEN", "test-token")
_test_db = os.path.join(tempfile.gettempdir(), "test_orchestrator_usage.db")
os.environ["ORCH_DB_PATH"] = _test_db
import pytest # noqa: E402
from src import db as db_module # noqa: E402
from src.db import init_db, get_db # noqa: E402
from src import usage as U # noqa: E402
# Real claude --output-format json result object (captured from CLI 2.1.142).
REAL_RESULT_JSON = (
'{"type":"result","subtype":"success","is_error":false,"duration_ms":1795,'
'"num_turns":1,"result":"Hi!","session_id":"abc",'
'"total_cost_usd":0.0560175,'
'"usage":{"input_tokens":45231,"cache_creation_input_tokens":7418,'
'"cache_read_input_tokens":18500,"output_tokens":12100,'
'"service_tier":"standard"},'
'"modelUsage":{"claude-opus-4-7":{"inputTokens":6,"outputTokens":7}},'
'"permission_denials":[]}'
)
@pytest.fixture(autouse=True)
def setup_db(monkeypatch):
# get_db() reads settings.db_path live; pin it to our isolated DB.
monkeypatch.setattr(db_module.settings, "db_path", _test_db, raising=False)
if os.path.exists(_test_db):
os.unlink(_test_db)
init_db()
yield
if os.path.exists(_test_db):
os.unlink(_test_db)
# --------------------------------------------------------------------------- #
# parsing
# --------------------------------------------------------------------------- #
def test_parse_real_result_json():
u = U.parse_usage_from_text(REAL_RESULT_JSON)
assert u is not None
assert u["input_tokens"] == 45231
assert u["output_tokens"] == 12100
assert u["cache_read_tokens"] == 18500
assert abs(u["cost_usd"] - 0.0560175) < 1e-9
def test_parse_with_leading_text():
"""The agent may print text before the trailing JSON; we still find it."""
text = "some agent stdout line\nanother line\n" + REAL_RESULT_JSON
u = U.parse_usage_from_text(text)
assert u is not None
assert u["input_tokens"] == 45231
assert u["output_tokens"] == 12100
def test_parse_garbage_returns_none():
assert U.parse_usage_from_text("not json at all { broken") is None
assert U.parse_usage_from_text("") is None
assert U.parse_usage_from_text(None) is None
def test_parse_json_without_usage_returns_none():
assert U.parse_usage_from_text('{"hello":"world"}') is None
def test_parse_from_log_missing_file_returns_none():
assert U.parse_usage_from_log("/no/such/file.log") is None
# --------------------------------------------------------------------------- #
# record_usage
# --------------------------------------------------------------------------- #
def _new_run(agent="developer", task_id=1):
conn = get_db()
cur = conn.execute("INSERT INTO agent_runs (task_id, agent) VALUES (?, ?)", (task_id, agent))
rid = cur.lastrowid
conn.commit()
conn.close()
return rid
def test_record_usage_writes_columns():
rid = _new_run()
u = U.parse_usage_from_text(REAL_RESULT_JSON)
U.record_usage(rid, u)
conn = get_db()
row = conn.execute(
"SELECT input_tokens, output_tokens, cache_read_tokens, cost_usd "
"FROM agent_runs WHERE id=?", (rid,)
).fetchone()
conn.close()
assert row["input_tokens"] == 45231
assert row["output_tokens"] == 12100
assert row["cache_read_tokens"] == 18500
assert abs(row["cost_usd"] - 0.0560175) < 1e-9
def test_record_usage_none_writes_nulls():
rid = _new_run()
U.record_usage(rid, None) # must not raise
conn = get_db()
row = conn.execute("SELECT input_tokens, cost_usd FROM agent_runs WHERE id=?", (rid,)).fetchone()
conn.close()
assert row["input_tokens"] is None
assert row["cost_usd"] is None
# --------------------------------------------------------------------------- #
# formatting
# --------------------------------------------------------------------------- #
def test_fmt_tokens():
assert U.fmt_tokens(6) == "6"
assert U.fmt_tokens(1234) == "1.2k"
assert U.fmt_tokens(45231) == "45.2k"
assert U.fmt_tokens(2_500_000) == "2.5M"
assert U.fmt_tokens(None) == "0"
def test_fmt_cost():
assert U.fmt_cost(0.21) == "$0.21"
assert U.fmt_cost(0.0560175) == "$0.06"
assert U.fmt_cost(None) == "$0.00"
def test_usage_comment_format():
u = {"input_tokens": 45231, "output_tokens": 12100, "cost_usd": 0.21}
c = U.usage_comment("developer", u)
assert "Developer" in c
assert "45.2k in" in c
assert "12.1k out" in c
assert "$0.21" in c
# --------------------------------------------------------------------------- #
# task summary
# --------------------------------------------------------------------------- #
def test_task_summary_aggregates_over_agents():
# two runs for the same task: developer + tester
for agent, ti, to, cost in [("developer", 1000, 200, 0.10), ("tester", 500, 100, 0.05)]:
rid = _new_run(agent=agent, task_id=42)
U.record_usage(rid, {"input_tokens": ti, "output_tokens": to,
"cache_read_tokens": 0, "cost_usd": cost})
s = U.task_usage_summary(42)
assert s["total_in"] == 1500
assert s["total_out"] == 300
assert abs(s["total_cost"] - 0.15) < 1e-9
agents = {a for a, *_ in s["per_agent"]}
assert agents == {"developer", "tester"}
comment = U.task_summary_comment(42)
assert "1.5k" in comment # total in
assert "$0.15" in comment # total cost
assert "Developer" in comment
assert "Tester" in comment