Files
orchestrator/src/usage.py
Dev Agent 9a702a0216 feat(metrics): per-agent token/cost accounting
Feature 4. claude is now launched with --output-format json; the run-log trailing
result JSON is parsed (defensively, never fatal) for usage + total_cost_usd. New
idempotent ALTERs add input_tokens/output_tokens/cache_read_tokens/cost_usd to
agent_runs; the launcher monitor records usage per run, posts a per-agent finish
comment under that agent bot (e.g. Developer gotov · 45.2k in / 12.1k out · $0.21),
and the deployer posts an end-of-task summary (SUM over agent_runs GROUP BY agent)
on done. New src/usage.py holds parse/format/record/summary helpers; test_usage.py
covers parsing a real CLI JSON blob, NULL-on-garbage, recording, formatting, and the
per-task aggregate.
2026-06-03 18:18:46 +03:00

269 lines
8.2 KiB
Python

"""Feature 4: token / cost accounting for agent runs.
claude --output-format json emits a single result JSON object at the end of the
run log with fields:
total_cost_usd
usage.input_tokens / output_tokens / cache_read_input_tokens /
cache_creation_input_tokens
modelUsage, num_turns, duration_ms
This module parses that JSON out of a (text-or-json) run log, records the usage
on the agent_runs row, formats a Plane comment for the finishing agent, and
builds the per-task summary the Deployer posts on deploy/done.
Everything here is defensive: a missing/garbled JSON never raises \u2014 we record
NULL/0 and log a warning so a broken agent run can't crash the monitor.
"""
import json
import logging
from .db import get_db
logger = logging.getLogger("orchestrator.usage")
def parse_usage_from_text(text: str) -> dict | None:
"""Extract the claude result-JSON usage from a run log's text.
The log may contain plain text before/after the JSON; with
--output-format json the JSON is the final object. We scan for the LAST
top-level '{' ... '}' that parses and carries usage/total_cost_usd.
Returns a normalised dict
{input_tokens, output_tokens, cache_read_tokens, cost_usd}
(ints / float, missing fields -> 0 / 0.0), or None if no usable JSON found.
"""
if not text:
return None
candidate = _extract_last_json_object(text)
if candidate is None:
return None
usage = candidate.get("usage") or {}
if not isinstance(usage, dict):
usage = {}
cost = candidate.get("total_cost_usd")
if cost is None:
cost = candidate.get("cost_usd")
# If there is neither a usage block nor a cost, this isn't a result object.
if not usage and cost is None:
return None
def _int(v):
try:
return int(v)
except (TypeError, ValueError):
return 0
def _float(v):
try:
return float(v)
except (TypeError, ValueError):
return 0.0
return {
"input_tokens": _int(usage.get("input_tokens")),
"output_tokens": _int(usage.get("output_tokens")),
"cache_read_tokens": _int(
usage.get("cache_read_input_tokens", usage.get("cache_read_tokens"))
),
"cost_usd": _float(cost),
}
def _extract_last_json_object(text: str) -> dict | None:
"""Return the last balanced top-level JSON object in `text` that parses.
Scans from the end for '}' and walks back to the matching '{' using a depth
counter (string-aware), trying json.loads on each candidate. Robust to log
lines or text emitted before the JSON.
"""
# Fast path: the whole stripped text is the JSON.
stripped = text.strip()
try:
obj = json.loads(stripped)
if isinstance(obj, dict):
return obj
except (ValueError, TypeError):
pass
# Otherwise find the last balanced { ... } block.
end = len(text)
while True:
close = text.rfind("}", 0, end)
if close == -1:
return None
depth = 0
in_str = False
esc = False
start = None
for i in range(close, -1, -1):
ch = text[i]
if in_str:
if esc:
esc = False
elif ch == "\\":
esc = True
elif ch == '"':
in_str = False
continue
if ch == '"':
in_str = True
elif ch == "}":
depth += 1
elif ch == "{":
depth -= 1
if depth == 0:
start = i
break
if start is not None:
blob = text[start:close + 1]
try:
obj = json.loads(blob)
if isinstance(obj, dict):
return obj
except (ValueError, TypeError):
pass
end = close # keep scanning earlier in the text
def parse_usage_from_log(path: str) -> dict | None:
"""Read a run log file and parse usage from it. Never raises."""
try:
with open(path, "r", encoding="utf-8", errors="replace") as f:
return parse_usage_from_text(f.read())
except OSError as e:
logger.warning(f"parse_usage_from_log: cannot read {path}: {e}")
return None
def record_usage(run_id: int, usage: dict | None):
"""Write parsed usage onto the agent_runs row. NULLs if usage is None."""
if usage is None:
logger.warning(f"run_id={run_id}: no usage JSON parsed, recording NULLs")
usage = {}
conn = get_db()
try:
conn.execute(
"UPDATE agent_runs SET input_tokens=?, output_tokens=?, "
"cache_read_tokens=?, cost_usd=? WHERE id=?",
(
usage.get("input_tokens"),
usage.get("output_tokens"),
usage.get("cache_read_tokens"),
usage.get("cost_usd"),
run_id,
),
)
conn.commit()
finally:
conn.close()
def fmt_tokens(n) -> str:
"""Format a token count compactly: 1234 -> '1.2k', 2_500_000 -> '2.5M'."""
try:
n = int(n or 0)
except (TypeError, ValueError):
n = 0
if n >= 1_000_000:
return f"{n / 1_000_000:.1f}M"
if n >= 1_000:
return f"{n / 1_000:.1f}k"
return str(n)
def fmt_cost(c) -> str:
"""Format USD cost with 2 decimals: '$0.21'."""
try:
c = float(c or 0.0)
except (TypeError, ValueError):
c = 0.0
return f"${c:.2f}"
# Pretty agent names for comments (mirrors STAGE_AUTHORS roles).
AGENT_DISPLAY = {
"analyst": "Analyst",
"architect": "Architect",
"developer": "Developer",
"reviewer": "Reviewer",
"tester": "Tester",
"deployer": "Deployer",
}
def usage_comment(agent: str, usage: dict | None) -> str:
"""Build the per-agent finish comment, e.g.
'\U0001f4bb Developer \u0433\u043e\u0442\u043e\u0432 \u00b7 45.2k in / 12.1k out \u00b7 $0.21'.
"""
usage = usage or {}
name = AGENT_DISPLAY.get(agent, agent.capitalize())
icon = AGENT_ICON.get(agent, "\u2705")
return (
f"{icon} {name} \u0433\u043e\u0442\u043e\u0432 \u00b7 "
f"{fmt_tokens(usage.get('input_tokens'))} in / "
f"{fmt_tokens(usage.get('output_tokens'))} out \u00b7 "
f"{fmt_cost(usage.get('cost_usd'))}"
)
AGENT_ICON = {
"analyst": "\U0001f50d",
"architect": "\U0001f4d0",
"developer": "\U0001f4bb",
"reviewer": "\U0001f50e",
"tester": "\U0001f9ea",
"deployer": "\U0001f680",
}
def task_usage_summary(task_id: int) -> dict:
"""Aggregate agent_runs usage for a task.
Returns {total_in, total_out, total_cost, per_agent: [(agent, in, out, cost), ...]}.
"""
conn = get_db()
try:
rows = conn.execute(
"SELECT agent, "
"COALESCE(SUM(input_tokens),0), "
"COALESCE(SUM(output_tokens),0), "
"COALESCE(SUM(cost_usd),0.0) "
"FROM agent_runs WHERE task_id=? GROUP BY agent ORDER BY agent",
(task_id,),
).fetchall()
finally:
conn.close()
per_agent = [(r[0], int(r[1]), int(r[2]), float(r[3])) for r in rows]
total_in = sum(r[1] for r in per_agent)
total_out = sum(r[2] for r in per_agent)
total_cost = sum(r[3] for r in per_agent)
return {
"total_in": total_in,
"total_out": total_out,
"total_cost": total_cost,
"per_agent": per_agent,
}
def task_summary_comment(task_id: int) -> str:
"""Build the Deployer end-of-task summary comment (Feature 4, variant B)."""
s = task_usage_summary(task_id)
lines = [
f"\U0001f4ca \u0418\u0442\u043e\u0433\u043e \u043f\u043e \u0437\u0430\u0434\u0430\u0447\u0435: "
f"{fmt_tokens(s['total_in'])} \u0442\u043e\u043a\u0435\u043d\u043e\u0432 \u0432\u0445\u043e\u0434 / "
f"{fmt_tokens(s['total_out'])} \u0432\u044b\u0445\u043e\u0434 \u00b7 "
f"{fmt_cost(s['total_cost'])}"
]
for agent, ti, to, cost in s["per_agent"]:
name = AGENT_DISPLAY.get(agent, agent.capitalize())
lines.append(
f"\u2022 {name}: {fmt_tokens(ti)} in / {fmt_tokens(to)} out \u00b7 {fmt_cost(cost)}"
)
return "\n".join(lines)