Feature 4. claude is now launched with --output-format json; the run-log trailing result JSON is parsed (defensively, never fatal) for usage + total_cost_usd. New idempotent ALTERs add input_tokens/output_tokens/cache_read_tokens/cost_usd to agent_runs; the launcher monitor records usage per run, posts a per-agent finish comment under that agent bot (e.g. Developer gotov · 45.2k in / 12.1k out · $0.21), and the deployer posts an end-of-task summary (SUM over agent_runs GROUP BY agent) on done. New src/usage.py holds parse/format/record/summary helpers; test_usage.py covers parsing a real CLI JSON blob, NULL-on-garbage, recording, formatting, and the per-task aggregate.
177 lines
6.0 KiB
Python
177 lines
6.0 KiB
Python
"""Feature 4: token / cost accounting tests.
|
|
|
|
Covers:
|
|
* parse_usage_from_text on a REAL claude --output-format json result blob
|
|
(captured live from CLI 2.1.142), including a leading text line.
|
|
* parse on garbage / missing JSON -> None (never raises).
|
|
* record_usage writes the columns; NULLs when usage is None.
|
|
* fmt_tokens / fmt_cost formatting.
|
|
* usage_comment string format.
|
|
* task_usage_summary / task_summary_comment aggregate over agent_runs.
|
|
|
|
DB is an isolated temp file; no network or subprocess.
|
|
"""
|
|
|
|
import os
|
|
import tempfile
|
|
|
|
os.environ.setdefault("ORCH_PLANE_API_TOKEN", "test-token")
|
|
os.environ.setdefault("ORCH_GITEA_TOKEN", "test-token")
|
|
|
|
_test_db = os.path.join(tempfile.gettempdir(), "test_orchestrator_usage.db")
|
|
os.environ["ORCH_DB_PATH"] = _test_db
|
|
|
|
import pytest # noqa: E402
|
|
|
|
from src import db as db_module # noqa: E402
|
|
from src.db import init_db, get_db # noqa: E402
|
|
from src import usage as U # noqa: E402
|
|
|
|
|
|
# Real claude --output-format json result object (captured from CLI 2.1.142).
|
|
REAL_RESULT_JSON = (
|
|
'{"type":"result","subtype":"success","is_error":false,"duration_ms":1795,'
|
|
'"num_turns":1,"result":"Hi!","session_id":"abc",'
|
|
'"total_cost_usd":0.0560175,'
|
|
'"usage":{"input_tokens":45231,"cache_creation_input_tokens":7418,'
|
|
'"cache_read_input_tokens":18500,"output_tokens":12100,'
|
|
'"service_tier":"standard"},'
|
|
'"modelUsage":{"claude-opus-4-7":{"inputTokens":6,"outputTokens":7}},'
|
|
'"permission_denials":[]}'
|
|
)
|
|
|
|
|
|
@pytest.fixture(autouse=True)
|
|
def setup_db(monkeypatch):
|
|
# get_db() reads settings.db_path live; pin it to our isolated DB.
|
|
monkeypatch.setattr(db_module.settings, "db_path", _test_db, raising=False)
|
|
if os.path.exists(_test_db):
|
|
os.unlink(_test_db)
|
|
init_db()
|
|
yield
|
|
if os.path.exists(_test_db):
|
|
os.unlink(_test_db)
|
|
|
|
|
|
# --------------------------------------------------------------------------- #
|
|
# parsing
|
|
# --------------------------------------------------------------------------- #
|
|
def test_parse_real_result_json():
|
|
u = U.parse_usage_from_text(REAL_RESULT_JSON)
|
|
assert u is not None
|
|
assert u["input_tokens"] == 45231
|
|
assert u["output_tokens"] == 12100
|
|
assert u["cache_read_tokens"] == 18500
|
|
assert abs(u["cost_usd"] - 0.0560175) < 1e-9
|
|
|
|
|
|
def test_parse_with_leading_text():
|
|
"""The agent may print text before the trailing JSON; we still find it."""
|
|
text = "some agent stdout line\nanother line\n" + REAL_RESULT_JSON
|
|
u = U.parse_usage_from_text(text)
|
|
assert u is not None
|
|
assert u["input_tokens"] == 45231
|
|
assert u["output_tokens"] == 12100
|
|
|
|
|
|
def test_parse_garbage_returns_none():
|
|
assert U.parse_usage_from_text("not json at all { broken") is None
|
|
assert U.parse_usage_from_text("") is None
|
|
assert U.parse_usage_from_text(None) is None
|
|
|
|
|
|
def test_parse_json_without_usage_returns_none():
|
|
assert U.parse_usage_from_text('{"hello":"world"}') is None
|
|
|
|
|
|
def test_parse_from_log_missing_file_returns_none():
|
|
assert U.parse_usage_from_log("/no/such/file.log") is None
|
|
|
|
|
|
# --------------------------------------------------------------------------- #
|
|
# record_usage
|
|
# --------------------------------------------------------------------------- #
|
|
def _new_run(agent="developer", task_id=1):
|
|
conn = get_db()
|
|
cur = conn.execute("INSERT INTO agent_runs (task_id, agent) VALUES (?, ?)", (task_id, agent))
|
|
rid = cur.lastrowid
|
|
conn.commit()
|
|
conn.close()
|
|
return rid
|
|
|
|
|
|
def test_record_usage_writes_columns():
|
|
rid = _new_run()
|
|
u = U.parse_usage_from_text(REAL_RESULT_JSON)
|
|
U.record_usage(rid, u)
|
|
conn = get_db()
|
|
row = conn.execute(
|
|
"SELECT input_tokens, output_tokens, cache_read_tokens, cost_usd "
|
|
"FROM agent_runs WHERE id=?", (rid,)
|
|
).fetchone()
|
|
conn.close()
|
|
assert row["input_tokens"] == 45231
|
|
assert row["output_tokens"] == 12100
|
|
assert row["cache_read_tokens"] == 18500
|
|
assert abs(row["cost_usd"] - 0.0560175) < 1e-9
|
|
|
|
|
|
def test_record_usage_none_writes_nulls():
|
|
rid = _new_run()
|
|
U.record_usage(rid, None) # must not raise
|
|
conn = get_db()
|
|
row = conn.execute("SELECT input_tokens, cost_usd FROM agent_runs WHERE id=?", (rid,)).fetchone()
|
|
conn.close()
|
|
assert row["input_tokens"] is None
|
|
assert row["cost_usd"] is None
|
|
|
|
|
|
# --------------------------------------------------------------------------- #
|
|
# formatting
|
|
# --------------------------------------------------------------------------- #
|
|
def test_fmt_tokens():
|
|
assert U.fmt_tokens(6) == "6"
|
|
assert U.fmt_tokens(1234) == "1.2k"
|
|
assert U.fmt_tokens(45231) == "45.2k"
|
|
assert U.fmt_tokens(2_500_000) == "2.5M"
|
|
assert U.fmt_tokens(None) == "0"
|
|
|
|
|
|
def test_fmt_cost():
|
|
assert U.fmt_cost(0.21) == "$0.21"
|
|
assert U.fmt_cost(0.0560175) == "$0.06"
|
|
assert U.fmt_cost(None) == "$0.00"
|
|
|
|
|
|
def test_usage_comment_format():
|
|
u = {"input_tokens": 45231, "output_tokens": 12100, "cost_usd": 0.21}
|
|
c = U.usage_comment("developer", u)
|
|
assert "Developer" in c
|
|
assert "45.2k in" in c
|
|
assert "12.1k out" in c
|
|
assert "$0.21" in c
|
|
|
|
|
|
# --------------------------------------------------------------------------- #
|
|
# task summary
|
|
# --------------------------------------------------------------------------- #
|
|
def test_task_summary_aggregates_over_agents():
|
|
# two runs for the same task: developer + tester
|
|
for agent, ti, to, cost in [("developer", 1000, 200, 0.10), ("tester", 500, 100, 0.05)]:
|
|
rid = _new_run(agent=agent, task_id=42)
|
|
U.record_usage(rid, {"input_tokens": ti, "output_tokens": to,
|
|
"cache_read_tokens": 0, "cost_usd": cost})
|
|
|
|
s = U.task_usage_summary(42)
|
|
assert s["total_in"] == 1500
|
|
assert s["total_out"] == 300
|
|
assert abs(s["total_cost"] - 0.15) < 1e-9
|
|
agents = {a for a, *_ in s["per_agent"]}
|
|
assert agents == {"developer", "tester"}
|
|
|
|
comment = U.task_summary_comment(42)
|
|
assert "1.5k" in comment # total in
|
|
assert "$0.15" in comment # total cost
|
|
assert "Developer" in comment
|
|
assert "Tester" in comment
|