"""Feature 4 + ORCH-016: token / cost accounting and unified status comments. claude --output-format json emits a single result JSON object at the end of the run log with fields: total_cost_usd usage.input_tokens / output_tokens / cache_read_input_tokens / cache_creation_input_tokens modelUsage, num_turns, duration_ms This module parses that JSON out of a (text-or-json) run log, records the usage on the agent_runs row, and builds: - per-agent status comments via build_status_comment(...) — the ORCH-016 unified format replacing the legacy usage_comment(...) and the analyst- only stage_engine._build_analyst_ready_comment(...). Every agent now flows through the same hot path. - per-task summary the Deployer posts on deploy/done. Everything here is defensive: a missing/garbled JSON never raises — we record NULL/0 and log a warning so a broken agent run can't crash the monitor. The status-comment hot path likewise NEVER raises (self-hosting risk R-1). """ import json import logging from .db import get_db logger = logging.getLogger("orchestrator.usage") def parse_usage_from_text(text: str) -> dict | None: """Extract the claude result-JSON usage from a run log's text. The log may contain plain text before/after the JSON; with --output-format json the JSON is the final object. We scan for the LAST top-level '{' ... '}' that parses and carries usage/total_cost_usd. Returns a normalised dict {input_tokens, output_tokens, cache_read_tokens, cache_creation_tokens, cost_usd} (ints / float, missing fields -> 0 / 0.0), or None if no usable JSON found. """ if not text: return None candidate = _extract_last_json_object(text) if candidate is None: return None usage = candidate.get("usage") or {} if not isinstance(usage, dict): usage = {} cost = candidate.get("total_cost_usd") if cost is None: cost = candidate.get("cost_usd") # If there is neither a usage block nor a cost, this isn't a result object. if not usage and cost is None: return None def _int(v): try: return int(v) except (TypeError, ValueError): return 0 def _float(v): try: return float(v) except (TypeError, ValueError): return 0.0 return { "input_tokens": _int(usage.get("input_tokens")), "output_tokens": _int(usage.get("output_tokens")), "cache_read_tokens": _int( usage.get("cache_read_input_tokens", usage.get("cache_read_tokens")) ), # The cache-CREATION slice (writing new cache entries) is part of the # REAL input and used to be dropped on the floor. Persist it so the # "X in" figure reflects the full prompt size, not just fresh tokens. "cache_creation_tokens": _int( usage.get("cache_creation_input_tokens", usage.get("cache_creation_tokens")) ), "cost_usd": _float(cost), # Telegram live tracker: the model the run actually used. claude # --output-format json reports it under modelUsage (a dict keyed by the # full model id) and/or a top-level "model" field. We keep the FULL name # here; short_model_name() trims it for the tracker. None when unknown. "model": _extract_model(candidate), } def _extract_model(candidate: dict) -> str | None: """Best-effort: pull the model id out of a claude result JSON object. Prefers modelUsage (a dict keyed by full model ids, e.g. {"claude-opus-4-8": {...}}) and returns the key with the most output tokens; falls back to a top-level "model" string. Never raises -> None. """ try: mu = candidate.get("modelUsage") if isinstance(mu, dict) and mu: def _out(v): try: return int((v or {}).get("outputTokens", 0)) except (TypeError, ValueError, AttributeError): return 0 best = max(mu.items(), key=lambda kv: _out(kv[1])) if best and best[0]: return str(best[0]) model = candidate.get("model") if isinstance(model, str) and model: return model except Exception: pass return None def short_model_name(full: str | None) -> str: """Trim a full model id to a short tag for the tracker. 'tokenator/claude-opus-4-8' -> 'opus-4-8' 'vibecode/claude-sonnet-4.6' -> 'sonnet-4.6' 'claude-opus-4-8' -> 'opus-4-8' Returns '' when full is falsy so callers can omit the ' · ' suffix. """ if not full: return "" name = str(full).strip() # Drop any provider prefix up to and including the last '/'. if "/" in name: name = name.rsplit("/", 1)[-1] # Drop a leading 'claude-' marketing prefix. if name.startswith("claude-"): name = name[len("claude-"):] return name def _extract_last_json_object(text: str) -> dict | None: """Return the last balanced top-level JSON object in `text` that parses. Scans from the end for '}' and walks back to the matching '{' using a depth counter (string-aware), trying json.loads on each candidate. Robust to log lines or text emitted before the JSON. """ # Fast path: the whole stripped text is the JSON. stripped = text.strip() try: obj = json.loads(stripped) if isinstance(obj, dict): return obj except (ValueError, TypeError): pass # Otherwise find the last balanced { ... } block. end = len(text) while True: close = text.rfind("}", 0, end) if close == -1: return None depth = 0 in_str = False esc = False start = None for i in range(close, -1, -1): ch = text[i] if in_str: if esc: esc = False elif ch == "\\": esc = True elif ch == '"': in_str = False continue if ch == '"': in_str = True elif ch == "}": depth += 1 elif ch == "{": depth -= 1 if depth == 0: start = i break if start is not None: blob = text[start:close + 1] try: obj = json.loads(blob) if isinstance(obj, dict): return obj except (ValueError, TypeError): pass end = close # keep scanning earlier in the text def parse_usage_from_log(path: str) -> dict | None: """Read a run log file and parse usage from it. Never raises.""" try: with open(path, "r", encoding="utf-8", errors="replace") as f: return parse_usage_from_text(f.read()) except OSError as e: logger.warning(f"parse_usage_from_log: cannot read {path}: {e}") return None def record_usage(run_id: int, usage: dict | None): """Write parsed usage onto the agent_runs row. NULLs if usage is None.""" if usage is None: logger.warning(f"run_id={run_id}: no usage JSON parsed, recording NULLs") usage = {} conn = get_db() try: conn.execute( "UPDATE agent_runs SET input_tokens=?, output_tokens=?, " "cache_read_tokens=?, cache_creation_tokens=?, cost_usd=?, " "model=COALESCE(?, model) WHERE id=?", ( usage.get("input_tokens"), usage.get("output_tokens"), usage.get("cache_read_tokens"), usage.get("cache_creation_tokens"), usage.get("cost_usd"), usage.get("model"), run_id, ), ) conn.commit() finally: conn.close() def fmt_tokens(n) -> str: """Format a token count compactly: 1234 -> '1.2k', 2_500_000 -> '2.5M'.""" try: n = int(n or 0) except (TypeError, ValueError): n = 0 if n >= 1_000_000: return f"{n / 1_000_000:.1f}M" if n >= 1_000: return f"{n / 1_000:.1f}k" return str(n) def fmt_cost(c) -> str: """Format USD cost with 2 decimals: '$0.21'.""" try: c = float(c or 0.0) except (TypeError, ValueError): c = 0.0 return f"${c:.2f}" def fmt_duration(seconds) -> str: """Format an integer second count for the agent-finish status comment (ORCH-016). Contract (ADR-001 §8 / AC-13): 0..59 -> '{s}s' (e.g. '0s', '12s', '59s') 60..3599 -> '{m}m {ss:02d}s' (e.g. '1m 00s', '4m 12s', '59m 59s') >= 3600 -> '{h}h {mm:02d}m' (seconds dropped; e.g. '1h 00m', '2h 47m') None / non-int / negative -> '' so the caller drops the 'Длительность:' line. Pure function: no I/O, no DB. """ try: if seconds is None: return "" s = int(seconds) except (TypeError, ValueError): return "" if s < 0: return "" if s < 60: return f"{s}s" if s < 3600: m, ss = divmod(s, 60) return f"{m}m {ss:02d}s" h, rem = divmod(s, 3600) mm = rem // 60 return f"{h}h {mm:02d}m" def get_agent_duration(task_id, agent: str) -> int | None: """Last finished agent_runs duration (seconds) for (task_id, agent) — DB fallback. ORCH-016 / ADR-001 §6: used by build_status_comment when the caller does NOT pass an explicit duration_s (chiefly the analyst path, which builds its comment from stage_engine where _duration_s is not in scope). Reads the last finished row for (task_id, agent) via: SELECT CAST((julianday(finished_at) - julianday(started_at)) * 86400 AS INTEGER) FROM agent_runs WHERE task_id=? AND agent=? AND finished_at IS NOT NULL ORDER BY id DESC LIMIT 1 Returns None on any of: - missing task_id / agent, - no matching row (or finished_at IS NULL), - computed value < 0 (clock skew), - DB error (logged at debug, never re-raised). This is the hot comment path — a locked / stale DB must never crash a finishing agent. """ if not task_id or not agent: return None try: conn = get_db() except Exception as e: logger.debug(f"get_agent_duration: cannot open DB for ({task_id},{agent}): {e}") return None try: row = conn.execute( "SELECT CAST((julianday(finished_at) - julianday(started_at)) * 86400 AS INTEGER) " "FROM agent_runs WHERE task_id=? AND agent=? AND finished_at IS NOT NULL " "ORDER BY id DESC LIMIT 1", (task_id, agent), ).fetchone() except Exception as e: logger.debug(f"get_agent_duration: query failed for ({task_id},{agent}): {e}") return None finally: try: conn.close() except Exception: pass if not row or row[0] is None: return None try: secs = int(row[0]) except (TypeError, ValueError): return None if secs < 0: return None return secs # Pretty agent names for comments (mirrors STAGE_AUTHORS roles). AGENT_DISPLAY = { "analyst": "Analyst", "architect": "Architect", "developer": "Developer", "reviewer": "Reviewer", "tester": "Tester", "deployer": "Deployer", } def _input_total(usage: dict) -> int: """FULL input = fresh input + cache-read + cache-creation tokens.""" def _i(k): try: return int(usage.get(k) or 0) except (TypeError, ValueError): return 0 return _i("input_tokens") + _i("cache_read_tokens") + _i("cache_creation_tokens") def _cached_total(usage: dict) -> int: """Cached portion of the input = cache-read + cache-creation tokens.""" def _i(k): try: return int(usage.get(k) or 0) except (TypeError, ValueError): return 0 return _i("cache_read_tokens") + _i("cache_creation_tokens") def fmt_in(usage: dict) -> str: """Render the input figure as full total with a cached breakdown. '8.5M in (8.4M cached)' when there is a cache; '45.2k in' when cached==0. """ total = _input_total(usage) cached = _cached_total(usage) if cached > 0: return f"{fmt_tokens(total)} in ({fmt_tokens(cached)} cached)" return f"{fmt_tokens(total)} in" def usage_comment( agent: str, usage: dict | None, repo: str | None = None, branch: str | None = None, work_item_id: str | None = None, pr_number=None, ) -> str: """DEPRECATED (ORCH-016 / ADR-001 §1): thin wrapper around build_status_comment. The historical one-line "{icon} Role готов · 8.5M in / 45.8k out · $7.29 + links" format has been replaced by the unified status-comment format. This wrapper is kept only so that legacy callers (notably the test suite in ``tests/test_usage.py``) keep working; new code MUST call ``build_status_comment(...)`` directly. There is no ``duration_s`` parameter here because the old signature did not carry it. """ return build_status_comment( agent, repo=repo, branch=branch, work_item_id=work_item_id, pr_number=pr_number, usage=usage, ) # Per-agent artifact file under docs/work-items/{wid}/ (architect/developer/ # deployer use special handling for ADR dirs, PR links, or staging logs — # see artifact_links()). AGENT_ARTIFACT = { "reviewer": ("Review", "12-review.md"), "tester": ("Test report", "13-test-report.md"), "deployer": ("Deploy log", "14-deploy-log.md"), } def artifact_links( agent: str, repo: str | None, branch: str | None, work_item_id: str | None, pr_number=None, *, stage: str | None = None, worktree_root: str | None = None, ) -> list[str]: """HTML
  • ...
  • link fragments for the finishing agent's artifacts. ORCH-016 (ADR-001 §7) breaking change: this function now emits HTML anchor fragments to feed straight into the