"""Feature 4: token / cost accounting for agent runs. claude --output-format json emits a single result JSON object at the end of the run log with fields: total_cost_usd usage.input_tokens / output_tokens / cache_read_input_tokens / cache_creation_input_tokens modelUsage, num_turns, duration_ms This module parses that JSON out of a (text-or-json) run log, records the usage on the agent_runs row, formats a Plane comment for the finishing agent, and builds the per-task summary the Deployer posts on deploy/done. Everything here is defensive: a missing/garbled JSON never raises \u2014 we record NULL/0 and log a warning so a broken agent run can't crash the monitor. """ import json import logging from .db import get_db logger = logging.getLogger("orchestrator.usage") def parse_usage_from_text(text: str) -> dict | None: """Extract the claude result-JSON usage from a run log's text. The log may contain plain text before/after the JSON; with --output-format json the JSON is the final object. We scan for the LAST top-level '{' ... '}' that parses and carries usage/total_cost_usd. Returns a normalised dict {input_tokens, output_tokens, cache_read_tokens, cache_creation_tokens, cost_usd} (ints / float, missing fields -> 0 / 0.0), or None if no usable JSON found. """ if not text: return None candidate = _extract_last_json_object(text) if candidate is None: return None usage = candidate.get("usage") or {} if not isinstance(usage, dict): usage = {} cost = candidate.get("total_cost_usd") if cost is None: cost = candidate.get("cost_usd") # If there is neither a usage block nor a cost, this isn't a result object. if not usage and cost is None: return None def _int(v): try: return int(v) except (TypeError, ValueError): return 0 def _float(v): try: return float(v) except (TypeError, ValueError): return 0.0 return { "input_tokens": _int(usage.get("input_tokens")), "output_tokens": _int(usage.get("output_tokens")), "cache_read_tokens": _int( usage.get("cache_read_input_tokens", usage.get("cache_read_tokens")) ), # The cache-CREATION slice (writing new cache entries) is part of the # REAL input and used to be dropped on the floor. Persist it so the # "X in" figure reflects the full prompt size, not just fresh tokens. "cache_creation_tokens": _int( usage.get("cache_creation_input_tokens", usage.get("cache_creation_tokens")) ), "cost_usd": _float(cost), # Telegram live tracker: the model the run actually used. claude # --output-format json reports it under modelUsage (a dict keyed by the # full model id) and/or a top-level "model" field. We keep the FULL name # here; short_model_name() trims it for the tracker. None when unknown. "model": _extract_model(candidate), } def _extract_model(candidate: dict) -> str | None: """Best-effort: pull the model id out of a claude result JSON object. Prefers modelUsage (a dict keyed by full model ids, e.g. {"claude-opus-4-8": {...}}) and returns the key with the most output tokens; falls back to a top-level "model" string. Never raises -> None. """ try: mu = candidate.get("modelUsage") if isinstance(mu, dict) and mu: def _out(v): try: return int((v or {}).get("outputTokens", 0)) except (TypeError, ValueError, AttributeError): return 0 best = max(mu.items(), key=lambda kv: _out(kv[1])) if best and best[0]: return str(best[0]) model = candidate.get("model") if isinstance(model, str) and model: return model except Exception: pass return None def short_model_name(full: str | None) -> str: """Trim a full model id to a short tag for the tracker. 'tokenator/claude-opus-4-8' -> 'opus-4-8' 'vibecode/claude-sonnet-4.6' -> 'sonnet-4.6' 'claude-opus-4-8' -> 'opus-4-8' Returns '' when full is falsy so callers can omit the ' · ' suffix. """ if not full: return "" name = str(full).strip() # Drop any provider prefix up to and including the last '/'. if "/" in name: name = name.rsplit("/", 1)[-1] # Drop a leading 'claude-' marketing prefix. if name.startswith("claude-"): name = name[len("claude-"):] return name def _extract_last_json_object(text: str) -> dict | None: """Return the last balanced top-level JSON object in `text` that parses. Scans from the end for '}' and walks back to the matching '{' using a depth counter (string-aware), trying json.loads on each candidate. Robust to log lines or text emitted before the JSON. """ # Fast path: the whole stripped text is the JSON. stripped = text.strip() try: obj = json.loads(stripped) if isinstance(obj, dict): return obj except (ValueError, TypeError): pass # Otherwise find the last balanced { ... } block. end = len(text) while True: close = text.rfind("}", 0, end) if close == -1: return None depth = 0 in_str = False esc = False start = None for i in range(close, -1, -1): ch = text[i] if in_str: if esc: esc = False elif ch == "\\": esc = True elif ch == '"': in_str = False continue if ch == '"': in_str = True elif ch == "}": depth += 1 elif ch == "{": depth -= 1 if depth == 0: start = i break if start is not None: blob = text[start:close + 1] try: obj = json.loads(blob) if isinstance(obj, dict): return obj except (ValueError, TypeError): pass end = close # keep scanning earlier in the text def parse_usage_from_log(path: str) -> dict | None: """Read a run log file and parse usage from it. Never raises.""" try: with open(path, "r", encoding="utf-8", errors="replace") as f: return parse_usage_from_text(f.read()) except OSError as e: logger.warning(f"parse_usage_from_log: cannot read {path}: {e}") return None def record_usage(run_id: int, usage: dict | None): """Write parsed usage onto the agent_runs row. NULLs if usage is None.""" if usage is None: logger.warning(f"run_id={run_id}: no usage JSON parsed, recording NULLs") usage = {} conn = get_db() try: conn.execute( "UPDATE agent_runs SET input_tokens=?, output_tokens=?, " "cache_read_tokens=?, cache_creation_tokens=?, cost_usd=?, " "model=COALESCE(?, model) WHERE id=?", ( usage.get("input_tokens"), usage.get("output_tokens"), usage.get("cache_read_tokens"), usage.get("cache_creation_tokens"), usage.get("cost_usd"), usage.get("model"), run_id, ), ) conn.commit() finally: conn.close() def fmt_tokens(n) -> str: """Format a token count compactly: 1234 -> '1.2k', 2_500_000 -> '2.5M'.""" try: n = int(n or 0) except (TypeError, ValueError): n = 0 if n >= 1_000_000: return f"{n / 1_000_000:.1f}M" if n >= 1_000: return f"{n / 1_000:.1f}k" return str(n) def fmt_cost(c) -> str: """Format USD cost with 2 decimals: '$0.21'.""" try: c = float(c or 0.0) except (TypeError, ValueError): c = 0.0 return f"${c:.2f}" # Pretty agent names for comments (mirrors STAGE_AUTHORS roles). AGENT_DISPLAY = { "analyst": "Analyst", "architect": "Architect", "developer": "Developer", "reviewer": "Reviewer", "tester": "Tester", "deployer": "Deployer", } def _input_total(usage: dict) -> int: """FULL input = fresh input + cache-read + cache-creation tokens.""" def _i(k): try: return int(usage.get(k) or 0) except (TypeError, ValueError): return 0 return _i("input_tokens") + _i("cache_read_tokens") + _i("cache_creation_tokens") def _cached_total(usage: dict) -> int: """Cached portion of the input = cache-read + cache-creation tokens.""" def _i(k): try: return int(usage.get(k) or 0) except (TypeError, ValueError): return 0 return _i("cache_read_tokens") + _i("cache_creation_tokens") def fmt_in(usage: dict) -> str: """Render the input figure as full total with a cached breakdown. '8.5M in (8.4M cached)' when there is a cache; '45.2k in' when cached==0. """ total = _input_total(usage) cached = _cached_total(usage) if cached > 0: return f"{fmt_tokens(total)} in ({fmt_tokens(cached)} cached)" return f"{fmt_tokens(total)} in" def usage_comment( agent: str, usage: dict | None, repo: str | None = None, branch: str | None = None, work_item_id: str | None = None, pr_number=None, ) -> str: """Build the per-agent finish comment, e.g. '\U0001f4bb Developer \u0433\u043e\u0442\u043e\u0432 \u00b7 8.5M in (8.4M cached) / 45.8k out \u00b7 $7.29'. When repo/branch/work_item_id are supplied, the agent's artifact link(s) are appended (BUG: only analyst used to link its docs). Missing artifacts are silently skipped — link building never raises. """ usage = usage or {} name = AGENT_DISPLAY.get(agent, agent.capitalize()) icon = AGENT_ICON.get(agent, "\u2705") line = ( f"{icon} {name} \u0433\u043e\u0442\u043e\u0432 \u00b7 " f"{fmt_in(usage)} / " f"{fmt_tokens(usage.get('output_tokens'))} out \u00b7 " f"{fmt_cost(usage.get('cost_usd'))}" ) links = artifact_links(agent, repo, branch, work_item_id, pr_number) if links: line += "\n" + "\n".join(links) return line # Per-agent artifact file under docs/work-items/{wid}/ (architect/developer use # special handling for ADR dirs / PR links, see artifact_links()). AGENT_ARTIFACT = { "reviewer": ("Review", "12-review.md"), "tester": ("Test report", "13-test-report.md"), "deployer": ("Deploy log", "14-deploy-log.md"), } def artifact_links( agent: str, repo: str | None, branch: str | None, work_item_id: str | None, pr_number=None, ) -> list[str]: """Markdown link(s) to the finishing agent's artifact(s) in Gitea. Uses gitea_public_url (falls back to gitea_url) for clickable links, mirroring the analyst doc links. Returns [] (never raises) when there is nothing to link or the required context is missing. analyst is intentionally NOT handled here — its richer doc list lives in stage_engine._build_analyst_ready_comment. """ try: from .config import settings owner = getattr(settings, "gitea_owner", "admin") base = ( getattr(settings, "gitea_public_url", "") or getattr(settings, "gitea_url", "") ).rstrip("/") if not base or not repo: return [] links: list[str] = [] if agent == "developer": if branch: links.append( f"\U0001f4c2 [Branch {branch}]({base}/{owner}/{repo}/src/branch/{branch})" ) if pr_number: links.append( f"\U0001f517 [PR #{pr_number}]({base}/{owner}/{repo}/pulls/{pr_number})" ) return links if agent == "architect": if branch and work_item_id: adr_dir = ( f"{base}/{owner}/{repo}/src/branch/{branch}/" f"docs/work-items/{work_item_id}/06-adr" ) links.append(f"\U0001f4d0 [ADR]({adr_dir})") return links spec = AGENT_ARTIFACT.get(agent) if spec and branch and work_item_id: label, fname = spec href = ( f"{base}/{owner}/{repo}/src/branch/{branch}/" f"docs/work-items/{work_item_id}/{fname}" ) links.append(f"\U0001f4c4 [{label}]({href})") return links except Exception: return [] AGENT_ICON = { "analyst": "\U0001f50d", "architect": "\U0001f4d0", "developer": "\U0001f4bb", "reviewer": "\U0001f50e", "tester": "\U0001f9ea", "deployer": "\U0001f680", } def task_usage_summary(task_id: int) -> dict: """Aggregate agent_runs usage for a task. total_in counts the FULL input (input + cache_read + cache_creation), and total_cached counts the cached portion (cache_read + cache_creation). COALESCE(...,0) keeps pre-existing rows (NULL cache_creation) from breaking. Returns {total_in, total_cached, total_out, total_cost, per_agent: [(agent, in, cached, out, cost), ...]}. """ conn = get_db() try: rows = conn.execute( "SELECT agent, " "COALESCE(SUM(input_tokens),0) " " + COALESCE(SUM(cache_read_tokens),0) " " + COALESCE(SUM(cache_creation_tokens),0), " "COALESCE(SUM(cache_read_tokens),0) " " + COALESCE(SUM(cache_creation_tokens),0), " "COALESCE(SUM(output_tokens),0), " "COALESCE(SUM(cost_usd),0.0) " "FROM agent_runs WHERE task_id=? GROUP BY agent ORDER BY agent", (task_id,), ).fetchall() finally: conn.close() per_agent = [(r[0], int(r[1]), int(r[2]), int(r[3]), float(r[4])) for r in rows] total_in = sum(r[1] for r in per_agent) total_cached = sum(r[2] for r in per_agent) total_out = sum(r[3] for r in per_agent) total_cost = sum(r[4] for r in per_agent) return { "total_in": total_in, "total_cached": total_cached, "total_out": total_out, "total_cost": total_cost, "per_agent": per_agent, } def task_summary_comment(task_id: int) -> str: """Build the Deployer end-of-task summary comment (Feature 4, variant B).""" s = task_usage_summary(task_id) cached = s.get("total_cached", 0) head_in = ( f"{fmt_tokens(s['total_in'])} \u0432\u0445\u043e\u0434 ({fmt_tokens(cached)} cached)" if cached > 0 else f"{fmt_tokens(s['total_in'])} \u0432\u0445\u043e\u0434" ) lines = [ f"\U0001f4ca \u0418\u0442\u043e\u0433\u043e \u043f\u043e \u0437\u0430\u0434\u0430\u0447\u0435: " f"{head_in} / " f"{fmt_tokens(s['total_out'])} \u0432\u044b\u0445\u043e\u0434 \u00b7 " f"{fmt_cost(s['total_cost'])}" ] for agent, ti, tc, to, cost in s["per_agent"]: name = AGENT_DISPLAY.get(agent, agent.capitalize()) in_str = ( f"{fmt_tokens(ti)} in ({fmt_tokens(tc)} cached)" if tc > 0 else f"{fmt_tokens(ti)} in" ) lines.append( f"\u2022 {name}: {in_str} / {fmt_tokens(to)} out \u00b7 {fmt_cost(cost)}" ) return "\n".join(lines)