"""Feature 4: token / cost accounting tests. Covers: * parse_usage_from_text on a REAL claude --output-format json result blob (captured live from CLI 2.1.142), including a leading text line. * parse on garbage / missing JSON -> None (never raises). * record_usage writes the columns; NULLs when usage is None. * fmt_tokens / fmt_cost formatting. * usage_comment string format. * task_usage_summary / task_summary_comment aggregate over agent_runs. DB is an isolated temp file; no network or subprocess. """ import os import tempfile os.environ.setdefault("ORCH_PLANE_API_TOKEN", "test-token") os.environ.setdefault("ORCH_GITEA_TOKEN", "test-token") _test_db = os.path.join(tempfile.gettempdir(), "test_orchestrator_usage.db") os.environ["ORCH_DB_PATH"] = _test_db import pytest # noqa: E402 from src import db as db_module # noqa: E402 from src.db import init_db, get_db # noqa: E402 from src import usage as U # noqa: E402 # Real claude --output-format json result object (captured from CLI 2.1.142). REAL_RESULT_JSON = ( '{"type":"result","subtype":"success","is_error":false,"duration_ms":1795,' '"num_turns":1,"result":"Hi!","session_id":"abc",' '"total_cost_usd":0.0560175,' '"usage":{"input_tokens":45231,"cache_creation_input_tokens":7418,' '"cache_read_input_tokens":18500,"output_tokens":12100,' '"service_tier":"standard"},' '"modelUsage":{"claude-opus-4-7":{"inputTokens":6,"outputTokens":7}},' '"permission_denials":[]}' ) @pytest.fixture(autouse=True) def setup_db(monkeypatch): # get_db() reads settings.db_path live; pin it to our isolated DB. monkeypatch.setattr(db_module.settings, "db_path", _test_db, raising=False) if os.path.exists(_test_db): os.unlink(_test_db) init_db() yield if os.path.exists(_test_db): os.unlink(_test_db) # --------------------------------------------------------------------------- # # parsing # --------------------------------------------------------------------------- # def test_parse_real_result_json(): u = U.parse_usage_from_text(REAL_RESULT_JSON) assert u is not None assert u["input_tokens"] == 45231 assert u["output_tokens"] == 12100 assert u["cache_read_tokens"] == 18500 # FIX 2: cache_creation slice must now be parsed (was dropped before). assert u["cache_creation_tokens"] == 7418 assert abs(u["cost_usd"] - 0.0560175) < 1e-9 def test_parse_cache_creation_present(): u = U.parse_usage_from_text(REAL_RESULT_JSON) assert u["cache_creation_tokens"] == 7418 def test_parse_cache_creation_missing_defaults_zero(): blob = ( '{"total_cost_usd":0.01,' '"usage":{"input_tokens":10,"output_tokens":5,' '"cache_read_input_tokens":100}}' ) u = U.parse_usage_from_text(blob) assert u["cache_creation_tokens"] == 0 assert u["cache_read_tokens"] == 100 def test_parse_with_leading_text(): """The agent may print text before the trailing JSON; we still find it.""" text = "some agent stdout line\nanother line\n" + REAL_RESULT_JSON u = U.parse_usage_from_text(text) assert u is not None assert u["input_tokens"] == 45231 assert u["output_tokens"] == 12100 def test_parse_garbage_returns_none(): assert U.parse_usage_from_text("not json at all { broken") is None assert U.parse_usage_from_text("") is None assert U.parse_usage_from_text(None) is None def test_parse_json_without_usage_returns_none(): assert U.parse_usage_from_text('{"hello":"world"}') is None def test_parse_from_log_missing_file_returns_none(): assert U.parse_usage_from_log("/no/such/file.log") is None # --------------------------------------------------------------------------- # # record_usage # --------------------------------------------------------------------------- # def _new_run(agent="developer", task_id=1): conn = get_db() cur = conn.execute("INSERT INTO agent_runs (task_id, agent) VALUES (?, ?)", (task_id, agent)) rid = cur.lastrowid conn.commit() conn.close() return rid def test_record_usage_writes_columns(): rid = _new_run() u = U.parse_usage_from_text(REAL_RESULT_JSON) U.record_usage(rid, u) conn = get_db() row = conn.execute( "SELECT input_tokens, output_tokens, cache_read_tokens, " "cache_creation_tokens, cost_usd " "FROM agent_runs WHERE id=?", (rid,) ).fetchone() conn.close() assert row["input_tokens"] == 45231 assert row["output_tokens"] == 12100 assert row["cache_read_tokens"] == 18500 # FIX 2: cache_creation column is now persisted. assert row["cache_creation_tokens"] == 7418 assert abs(row["cost_usd"] - 0.0560175) < 1e-9 def test_record_usage_none_writes_nulls(): rid = _new_run() U.record_usage(rid, None) # must not raise conn = get_db() row = conn.execute("SELECT input_tokens, cost_usd FROM agent_runs WHERE id=?", (rid,)).fetchone() conn.close() assert row["input_tokens"] is None assert row["cost_usd"] is None # --------------------------------------------------------------------------- # # formatting # --------------------------------------------------------------------------- # def test_fmt_tokens(): assert U.fmt_tokens(6) == "6" assert U.fmt_tokens(1234) == "1.2k" assert U.fmt_tokens(45231) == "45.2k" assert U.fmt_tokens(2_500_000) == "2.5M" assert U.fmt_tokens(None) == "0" def test_fmt_cost(): assert U.fmt_cost(0.21) == "$0.21" assert U.fmt_cost(0.0560175) == "$0.06" assert U.fmt_cost(None) == "$0.00" def test_usage_comment_format(): # No cache -> in_total == input_tokens, no cached breakdown shown. u = {"input_tokens": 45231, "output_tokens": 12100, "cost_usd": 0.21} c = U.usage_comment("developer", u) assert "Developer" in c assert "45.2k in" in c assert "cached" not in c assert "12.1k out" in c assert "$0.21" in c def test_usage_comment_shows_full_input_with_cached(): """FIX 2: in = input + cache_read + cache_creation, with cached breakdown.""" u = { "input_tokens": 81, "cache_read_tokens": 8_400_000, "cache_creation_tokens": 100_000, "output_tokens": 45_800, "cost_usd": 7.29, } c = U.usage_comment("developer", u) # total in = 8_500_081 -> 8.5M ; cached = 8_500_000 -> 8.5M assert "8.5M in (8.5M cached)" in c assert "45.8k out" in c assert "$7.29" in c def test_usage_comment_no_cached_when_zero(): u = {"input_tokens": 1234, "cache_read_tokens": 0, "cache_creation_tokens": 0, "output_tokens": 50, "cost_usd": 0.01} c = U.usage_comment("developer", u) assert "1.2k in" in c assert "cached" not in c # --------------------------------------------------------------------------- # # FIX 4: per-agent artifact links in finish comments # --------------------------------------------------------------------------- # def _ctx(): return dict(repo="enduro-trails", branch="feature/ET-012-x", work_item_id="ET-012") def test_usage_comment_reviewer_links_review_doc(): c = U.usage_comment("reviewer", {"input_tokens": 5}, **_ctx()) assert "12-review.md" in c assert "ET-012" in c def test_usage_comment_tester_links_test_report(): c = U.usage_comment("tester", {"input_tokens": 5}, **_ctx()) assert "13-test-report.md" in c def test_usage_comment_deployer_links_deploy_log(): c = U.usage_comment("deployer", {"input_tokens": 5}, **_ctx()) assert "14-deploy-log.md" in c def test_usage_comment_developer_links_pr_and_branch(): c = U.usage_comment("developer", {"input_tokens": 5}, pr_number=7, **_ctx()) assert "pulls/7" in c assert "feature/ET-012-x" in c def test_usage_comment_architect_links_adr(): c = U.usage_comment("architect", {"input_tokens": 5}, **_ctx()) assert "06-adr" in c def test_usage_comment_no_links_without_context(): """Without repo/branch context, no links are appended (no crash).""" c = U.usage_comment("reviewer", {"input_tokens": 5}) assert "12-review.md" not in c assert "http" not in c # --------------------------------------------------------------------------- # # task summary # --------------------------------------------------------------------------- # def test_task_summary_aggregates_over_agents(): # two runs for the same task: developer + tester for agent, ti, to, cost in [("developer", 1000, 200, 0.10), ("tester", 500, 100, 0.05)]: rid = _new_run(agent=agent, task_id=42) U.record_usage(rid, {"input_tokens": ti, "output_tokens": to, "cache_read_tokens": 0, "cost_usd": cost}) s = U.task_usage_summary(42) assert s["total_in"] == 1500 assert s["total_out"] == 300 assert abs(s["total_cost"] - 0.15) < 1e-9 agents = {a for a, *_ in s["per_agent"]} assert agents == {"developer", "tester"} comment = U.task_summary_comment(42) assert "1.5k" in comment # total in assert "$0.15" in comment # total cost assert "Developer" in comment assert "Tester" in comment def test_task_summary_sums_all_three_input_components(): """FIX 2: total_in = SUM(input + cache_read + cache_creation); total_cached too.""" rid = _new_run(agent="developer", task_id=77) U.record_usage(rid, { "input_tokens": 100, "cache_read_tokens": 2000, "cache_creation_tokens": 900, "output_tokens": 50, "cost_usd": 0.10, }) rid2 = _new_run(agent="tester", task_id=77) U.record_usage(rid2, { "input_tokens": 10, "cache_read_tokens": 500, "cache_creation_tokens": 0, "output_tokens": 5, "cost_usd": 0.05, }) s = U.task_usage_summary(77) # total_in = (100+2000+900) + (10+500+0) = 3510 assert s["total_in"] == 3510 # total_cached = (2000+900) + (500+0) = 3400 assert s["total_cached"] == 3400 assert s["total_out"] == 55 comment = U.task_summary_comment(77) assert "cached" in comment def test_task_summary_handles_null_cache_creation(): """Pre-existing rows (NULL cache_creation) must not break aggregation.""" rid = _new_run(agent="developer", task_id=88) conn = get_db() conn.execute( "UPDATE agent_runs SET input_tokens=100, cache_read_tokens=200, " "cache_creation_tokens=NULL, output_tokens=10, cost_usd=0.01 WHERE id=?", (rid,), ) conn.commit() conn.close() s = U.task_usage_summary(88) # must not raise assert s["total_in"] == 300 # 100 + 200 + (NULL->0) assert s["total_cached"] == 200