Feature 4. claude is now launched with --output-format json; the run-log trailing result JSON is parsed (defensively, never fatal) for usage + total_cost_usd. New idempotent ALTERs add input_tokens/output_tokens/cache_read_tokens/cost_usd to agent_runs; the launcher monitor records usage per run, posts a per-agent finish comment under that agent bot (e.g. Developer gotov · 45.2k in / 12.1k out · $0.21), and the deployer posts an end-of-task summary (SUM over agent_runs GROUP BY agent) on done. New src/usage.py holds parse/format/record/summary helpers; test_usage.py covers parsing a real CLI JSON blob, NULL-on-garbage, recording, formatting, and the per-task aggregate.
269 lines
8.2 KiB
Python
269 lines
8.2 KiB
Python
"""Feature 4: token / cost accounting for agent runs.
|
|
|
|
claude --output-format json emits a single result JSON object at the end of the
|
|
run log with fields:
|
|
total_cost_usd
|
|
usage.input_tokens / output_tokens / cache_read_input_tokens /
|
|
cache_creation_input_tokens
|
|
modelUsage, num_turns, duration_ms
|
|
|
|
This module parses that JSON out of a (text-or-json) run log, records the usage
|
|
on the agent_runs row, formats a Plane comment for the finishing agent, and
|
|
builds the per-task summary the Deployer posts on deploy/done.
|
|
|
|
Everything here is defensive: a missing/garbled JSON never raises \u2014 we record
|
|
NULL/0 and log a warning so a broken agent run can't crash the monitor.
|
|
"""
|
|
|
|
import json
|
|
import logging
|
|
|
|
from .db import get_db
|
|
|
|
logger = logging.getLogger("orchestrator.usage")
|
|
|
|
|
|
def parse_usage_from_text(text: str) -> dict | None:
|
|
"""Extract the claude result-JSON usage from a run log's text.
|
|
|
|
The log may contain plain text before/after the JSON; with
|
|
--output-format json the JSON is the final object. We scan for the LAST
|
|
top-level '{' ... '}' that parses and carries usage/total_cost_usd.
|
|
|
|
Returns a normalised dict
|
|
{input_tokens, output_tokens, cache_read_tokens, cost_usd}
|
|
(ints / float, missing fields -> 0 / 0.0), or None if no usable JSON found.
|
|
"""
|
|
if not text:
|
|
return None
|
|
|
|
candidate = _extract_last_json_object(text)
|
|
if candidate is None:
|
|
return None
|
|
|
|
usage = candidate.get("usage") or {}
|
|
if not isinstance(usage, dict):
|
|
usage = {}
|
|
|
|
cost = candidate.get("total_cost_usd")
|
|
if cost is None:
|
|
cost = candidate.get("cost_usd")
|
|
|
|
# If there is neither a usage block nor a cost, this isn't a result object.
|
|
if not usage and cost is None:
|
|
return None
|
|
|
|
def _int(v):
|
|
try:
|
|
return int(v)
|
|
except (TypeError, ValueError):
|
|
return 0
|
|
|
|
def _float(v):
|
|
try:
|
|
return float(v)
|
|
except (TypeError, ValueError):
|
|
return 0.0
|
|
|
|
return {
|
|
"input_tokens": _int(usage.get("input_tokens")),
|
|
"output_tokens": _int(usage.get("output_tokens")),
|
|
"cache_read_tokens": _int(
|
|
usage.get("cache_read_input_tokens", usage.get("cache_read_tokens"))
|
|
),
|
|
"cost_usd": _float(cost),
|
|
}
|
|
|
|
|
|
def _extract_last_json_object(text: str) -> dict | None:
|
|
"""Return the last balanced top-level JSON object in `text` that parses.
|
|
|
|
Scans from the end for '}' and walks back to the matching '{' using a depth
|
|
counter (string-aware), trying json.loads on each candidate. Robust to log
|
|
lines or text emitted before the JSON.
|
|
"""
|
|
# Fast path: the whole stripped text is the JSON.
|
|
stripped = text.strip()
|
|
try:
|
|
obj = json.loads(stripped)
|
|
if isinstance(obj, dict):
|
|
return obj
|
|
except (ValueError, TypeError):
|
|
pass
|
|
|
|
# Otherwise find the last balanced { ... } block.
|
|
end = len(text)
|
|
while True:
|
|
close = text.rfind("}", 0, end)
|
|
if close == -1:
|
|
return None
|
|
depth = 0
|
|
in_str = False
|
|
esc = False
|
|
start = None
|
|
for i in range(close, -1, -1):
|
|
ch = text[i]
|
|
if in_str:
|
|
if esc:
|
|
esc = False
|
|
elif ch == "\\":
|
|
esc = True
|
|
elif ch == '"':
|
|
in_str = False
|
|
continue
|
|
if ch == '"':
|
|
in_str = True
|
|
elif ch == "}":
|
|
depth += 1
|
|
elif ch == "{":
|
|
depth -= 1
|
|
if depth == 0:
|
|
start = i
|
|
break
|
|
if start is not None:
|
|
blob = text[start:close + 1]
|
|
try:
|
|
obj = json.loads(blob)
|
|
if isinstance(obj, dict):
|
|
return obj
|
|
except (ValueError, TypeError):
|
|
pass
|
|
end = close # keep scanning earlier in the text
|
|
|
|
|
|
def parse_usage_from_log(path: str) -> dict | None:
|
|
"""Read a run log file and parse usage from it. Never raises."""
|
|
try:
|
|
with open(path, "r", encoding="utf-8", errors="replace") as f:
|
|
return parse_usage_from_text(f.read())
|
|
except OSError as e:
|
|
logger.warning(f"parse_usage_from_log: cannot read {path}: {e}")
|
|
return None
|
|
|
|
|
|
def record_usage(run_id: int, usage: dict | None):
|
|
"""Write parsed usage onto the agent_runs row. NULLs if usage is None."""
|
|
if usage is None:
|
|
logger.warning(f"run_id={run_id}: no usage JSON parsed, recording NULLs")
|
|
usage = {}
|
|
conn = get_db()
|
|
try:
|
|
conn.execute(
|
|
"UPDATE agent_runs SET input_tokens=?, output_tokens=?, "
|
|
"cache_read_tokens=?, cost_usd=? WHERE id=?",
|
|
(
|
|
usage.get("input_tokens"),
|
|
usage.get("output_tokens"),
|
|
usage.get("cache_read_tokens"),
|
|
usage.get("cost_usd"),
|
|
run_id,
|
|
),
|
|
)
|
|
conn.commit()
|
|
finally:
|
|
conn.close()
|
|
|
|
|
|
def fmt_tokens(n) -> str:
|
|
"""Format a token count compactly: 1234 -> '1.2k', 2_500_000 -> '2.5M'."""
|
|
try:
|
|
n = int(n or 0)
|
|
except (TypeError, ValueError):
|
|
n = 0
|
|
if n >= 1_000_000:
|
|
return f"{n / 1_000_000:.1f}M"
|
|
if n >= 1_000:
|
|
return f"{n / 1_000:.1f}k"
|
|
return str(n)
|
|
|
|
|
|
def fmt_cost(c) -> str:
|
|
"""Format USD cost with 2 decimals: '$0.21'."""
|
|
try:
|
|
c = float(c or 0.0)
|
|
except (TypeError, ValueError):
|
|
c = 0.0
|
|
return f"${c:.2f}"
|
|
|
|
|
|
# Pretty agent names for comments (mirrors STAGE_AUTHORS roles).
|
|
AGENT_DISPLAY = {
|
|
"analyst": "Analyst",
|
|
"architect": "Architect",
|
|
"developer": "Developer",
|
|
"reviewer": "Reviewer",
|
|
"tester": "Tester",
|
|
"deployer": "Deployer",
|
|
}
|
|
|
|
|
|
def usage_comment(agent: str, usage: dict | None) -> str:
|
|
"""Build the per-agent finish comment, e.g.
|
|
'\U0001f4bb Developer \u0433\u043e\u0442\u043e\u0432 \u00b7 45.2k in / 12.1k out \u00b7 $0.21'.
|
|
"""
|
|
usage = usage or {}
|
|
name = AGENT_DISPLAY.get(agent, agent.capitalize())
|
|
icon = AGENT_ICON.get(agent, "\u2705")
|
|
return (
|
|
f"{icon} {name} \u0433\u043e\u0442\u043e\u0432 \u00b7 "
|
|
f"{fmt_tokens(usage.get('input_tokens'))} in / "
|
|
f"{fmt_tokens(usage.get('output_tokens'))} out \u00b7 "
|
|
f"{fmt_cost(usage.get('cost_usd'))}"
|
|
)
|
|
|
|
|
|
AGENT_ICON = {
|
|
"analyst": "\U0001f50d",
|
|
"architect": "\U0001f4d0",
|
|
"developer": "\U0001f4bb",
|
|
"reviewer": "\U0001f50e",
|
|
"tester": "\U0001f9ea",
|
|
"deployer": "\U0001f680",
|
|
}
|
|
|
|
|
|
def task_usage_summary(task_id: int) -> dict:
|
|
"""Aggregate agent_runs usage for a task.
|
|
|
|
Returns {total_in, total_out, total_cost, per_agent: [(agent, in, out, cost), ...]}.
|
|
"""
|
|
conn = get_db()
|
|
try:
|
|
rows = conn.execute(
|
|
"SELECT agent, "
|
|
"COALESCE(SUM(input_tokens),0), "
|
|
"COALESCE(SUM(output_tokens),0), "
|
|
"COALESCE(SUM(cost_usd),0.0) "
|
|
"FROM agent_runs WHERE task_id=? GROUP BY agent ORDER BY agent",
|
|
(task_id,),
|
|
).fetchall()
|
|
finally:
|
|
conn.close()
|
|
per_agent = [(r[0], int(r[1]), int(r[2]), float(r[3])) for r in rows]
|
|
total_in = sum(r[1] for r in per_agent)
|
|
total_out = sum(r[2] for r in per_agent)
|
|
total_cost = sum(r[3] for r in per_agent)
|
|
return {
|
|
"total_in": total_in,
|
|
"total_out": total_out,
|
|
"total_cost": total_cost,
|
|
"per_agent": per_agent,
|
|
}
|
|
|
|
|
|
def task_summary_comment(task_id: int) -> str:
|
|
"""Build the Deployer end-of-task summary comment (Feature 4, variant B)."""
|
|
s = task_usage_summary(task_id)
|
|
lines = [
|
|
f"\U0001f4ca \u0418\u0442\u043e\u0433\u043e \u043f\u043e \u0437\u0430\u0434\u0430\u0447\u0435: "
|
|
f"{fmt_tokens(s['total_in'])} \u0442\u043e\u043a\u0435\u043d\u043e\u0432 \u0432\u0445\u043e\u0434 / "
|
|
f"{fmt_tokens(s['total_out'])} \u0432\u044b\u0445\u043e\u0434 \u00b7 "
|
|
f"{fmt_cost(s['total_cost'])}"
|
|
]
|
|
for agent, ti, to, cost in s["per_agent"]:
|
|
name = AGENT_DISPLAY.get(agent, agent.capitalize())
|
|
lines.append(
|
|
f"\u2022 {name}: {fmt_tokens(ti)} in / {fmt_tokens(to)} out \u00b7 {fmt_cost(cost)}"
|
|
)
|
|
return "\n".join(lines)
|