Files
orchestrator/src/usage.py

900 lines
32 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""Feature 4 + ORCH-016: token / cost accounting and unified status comments.
claude --output-format json emits a single result JSON object at the end of the
run log with fields:
total_cost_usd
usage.input_tokens / output_tokens / cache_read_input_tokens /
cache_creation_input_tokens
modelUsage, num_turns, duration_ms
This module parses that JSON out of a (text-or-json) run log, records the usage
on the agent_runs row, and builds:
- per-agent status comments via build_status_comment(...) — the ORCH-016
unified format replacing the legacy usage_comment(...) and the analyst-
only stage_engine._build_analyst_ready_comment(...). Every agent now flows
through the same hot path.
- per-task summary the Deployer posts on deploy/done.
Everything here is defensive: a missing/garbled JSON never raises — we record
NULL/0 and log a warning so a broken agent run can't crash the monitor. The
status-comment hot path likewise NEVER raises (self-hosting risk R-1).
"""
import json
import logging
from .db import get_db
logger = logging.getLogger("orchestrator.usage")
def parse_usage_from_text(text: str) -> dict | None:
"""Extract the claude result-JSON usage from a run log's text.
The log may contain plain text before/after the JSON; with
--output-format json the JSON is the final object. We scan for the LAST
top-level '{' ... '}' that parses and carries usage/total_cost_usd.
Returns a normalised dict
{input_tokens, output_tokens, cache_read_tokens, cache_creation_tokens,
cost_usd}
(ints / float, missing fields -> 0 / 0.0), or None if no usable JSON found.
"""
if not text:
return None
candidate = _extract_last_json_object(text)
if candidate is None:
return None
usage = candidate.get("usage") or {}
if not isinstance(usage, dict):
usage = {}
cost = candidate.get("total_cost_usd")
if cost is None:
cost = candidate.get("cost_usd")
# If there is neither a usage block nor a cost, this isn't a result object.
if not usage and cost is None:
return None
def _int(v):
try:
return int(v)
except (TypeError, ValueError):
return 0
def _float(v):
try:
return float(v)
except (TypeError, ValueError):
return 0.0
return {
"input_tokens": _int(usage.get("input_tokens")),
"output_tokens": _int(usage.get("output_tokens")),
"cache_read_tokens": _int(
usage.get("cache_read_input_tokens", usage.get("cache_read_tokens"))
),
# The cache-CREATION slice (writing new cache entries) is part of the
# REAL input and used to be dropped on the floor. Persist it so the
# "X in" figure reflects the full prompt size, not just fresh tokens.
"cache_creation_tokens": _int(
usage.get("cache_creation_input_tokens", usage.get("cache_creation_tokens"))
),
"cost_usd": _float(cost),
# Telegram live tracker: the model the run actually used. claude
# --output-format json reports it under modelUsage (a dict keyed by the
# full model id) and/or a top-level "model" field. We keep the FULL name
# here; short_model_name() trims it for the tracker. None when unknown.
"model": _extract_model(candidate),
}
def _extract_model(candidate: dict) -> str | None:
"""Best-effort: pull the model id out of a claude result JSON object.
Prefers modelUsage (a dict keyed by full model ids, e.g.
{"claude-opus-4-8": {...}}) and returns the key with the most output
tokens; falls back to a top-level "model" string. Never raises -> None.
"""
try:
mu = candidate.get("modelUsage")
if isinstance(mu, dict) and mu:
def _out(v):
try:
return int((v or {}).get("outputTokens", 0))
except (TypeError, ValueError, AttributeError):
return 0
best = max(mu.items(), key=lambda kv: _out(kv[1]))
if best and best[0]:
return str(best[0])
model = candidate.get("model")
if isinstance(model, str) and model:
return model
except Exception:
pass
return None
def short_model_name(full: str | None) -> str:
"""Trim a full model id to a short tag for the tracker.
'tokenator/claude-opus-4-8' -> 'opus-4-8'
'vibecode/claude-sonnet-4.6' -> 'sonnet-4.6'
'claude-opus-4-8' -> 'opus-4-8'
Returns '' when full is falsy so callers can omit the ' · <model>' suffix.
"""
if not full:
return ""
name = str(full).strip()
# Drop any provider prefix up to and including the last '/'.
if "/" in name:
name = name.rsplit("/", 1)[-1]
# Drop a leading 'claude-' marketing prefix.
if name.startswith("claude-"):
name = name[len("claude-"):]
return name
def _extract_last_json_object(text: str) -> dict | None:
"""Return the last balanced top-level JSON object in `text` that parses.
Scans from the end for '}' and walks back to the matching '{' using a depth
counter (string-aware), trying json.loads on each candidate. Robust to log
lines or text emitted before the JSON.
"""
# Fast path: the whole stripped text is the JSON.
stripped = text.strip()
try:
obj = json.loads(stripped)
if isinstance(obj, dict):
return obj
except (ValueError, TypeError):
pass
# Otherwise find the last balanced { ... } block.
end = len(text)
while True:
close = text.rfind("}", 0, end)
if close == -1:
return None
depth = 0
in_str = False
esc = False
start = None
for i in range(close, -1, -1):
ch = text[i]
if in_str:
if esc:
esc = False
elif ch == "\\":
esc = True
elif ch == '"':
in_str = False
continue
if ch == '"':
in_str = True
elif ch == "}":
depth += 1
elif ch == "{":
depth -= 1
if depth == 0:
start = i
break
if start is not None:
blob = text[start:close + 1]
try:
obj = json.loads(blob)
if isinstance(obj, dict):
return obj
except (ValueError, TypeError):
pass
end = close # keep scanning earlier in the text
def parse_usage_from_log(path: str) -> dict | None:
"""Read a run log file and parse usage from it. Never raises."""
try:
with open(path, "r", encoding="utf-8", errors="replace") as f:
return parse_usage_from_text(f.read())
except OSError as e:
logger.warning(f"parse_usage_from_log: cannot read {path}: {e}")
return None
def record_usage(run_id: int, usage: dict | None):
"""Write parsed usage onto the agent_runs row. NULLs if usage is None."""
if usage is None:
logger.warning(f"run_id={run_id}: no usage JSON parsed, recording NULLs")
usage = {}
conn = get_db()
try:
conn.execute(
"UPDATE agent_runs SET input_tokens=?, output_tokens=?, "
"cache_read_tokens=?, cache_creation_tokens=?, cost_usd=?, "
"model=COALESCE(?, model) WHERE id=?",
(
usage.get("input_tokens"),
usage.get("output_tokens"),
usage.get("cache_read_tokens"),
usage.get("cache_creation_tokens"),
usage.get("cost_usd"),
usage.get("model"),
run_id,
),
)
conn.commit()
finally:
conn.close()
def fmt_tokens(n) -> str:
"""Format a token count compactly: 1234 -> '1.2k', 2_500_000 -> '2.5M'."""
try:
n = int(n or 0)
except (TypeError, ValueError):
n = 0
if n >= 1_000_000:
return f"{n / 1_000_000:.1f}M"
if n >= 1_000:
return f"{n / 1_000:.1f}k"
return str(n)
def fmt_cost(c) -> str:
"""Format USD cost with 2 decimals: '$0.21'."""
try:
c = float(c or 0.0)
except (TypeError, ValueError):
c = 0.0
return f"${c:.2f}"
def fmt_duration(seconds) -> str:
"""Format an integer second count for the agent-finish status comment (ORCH-016).
Contract (ADR-001 §8 / AC-13):
0..59 -> '{s}s' (e.g. '0s', '12s', '59s')
60..3599 -> '{m}m {ss:02d}s' (e.g. '1m 00s', '4m 12s', '59m 59s')
>= 3600 -> '{h}h {mm:02d}m' (seconds dropped; e.g. '1h 00m', '2h 47m')
None / non-int / negative -> '' so the caller drops the 'Длительность:' line.
Pure function: no I/O, no DB.
"""
try:
if seconds is None:
return ""
s = int(seconds)
except (TypeError, ValueError):
return ""
if s < 0:
return ""
if s < 60:
return f"{s}s"
if s < 3600:
m, ss = divmod(s, 60)
return f"{m}m {ss:02d}s"
h, rem = divmod(s, 3600)
mm = rem // 60
return f"{h}h {mm:02d}m"
def get_agent_duration(task_id, agent: str) -> int | None:
"""Last finished agent_runs duration (seconds) for (task_id, agent) — DB fallback.
ORCH-016 / ADR-001 §6: used by build_status_comment when the caller does NOT
pass an explicit duration_s (chiefly the analyst path, which builds its
comment from stage_engine where _duration_s is not in scope).
Reads the last finished row for (task_id, agent) via:
SELECT CAST((julianday(finished_at) - julianday(started_at)) * 86400 AS INTEGER)
FROM agent_runs WHERE task_id=? AND agent=?
AND finished_at IS NOT NULL
ORDER BY id DESC LIMIT 1
Returns None on any of:
- missing task_id / agent,
- no matching row (or finished_at IS NULL),
- computed value < 0 (clock skew),
- DB error (logged at debug, never re-raised). This is the hot comment
path — a locked / stale DB must never crash a finishing agent.
"""
if not task_id or not agent:
return None
try:
conn = get_db()
except Exception as e:
logger.debug(f"get_agent_duration: cannot open DB for ({task_id},{agent}): {e}")
return None
try:
row = conn.execute(
"SELECT CAST((julianday(finished_at) - julianday(started_at)) * 86400 AS INTEGER) "
"FROM agent_runs WHERE task_id=? AND agent=? AND finished_at IS NOT NULL "
"ORDER BY id DESC LIMIT 1",
(task_id, agent),
).fetchone()
except Exception as e:
logger.debug(f"get_agent_duration: query failed for ({task_id},{agent}): {e}")
return None
finally:
try:
conn.close()
except Exception:
pass
if not row or row[0] is None:
return None
try:
secs = int(row[0])
except (TypeError, ValueError):
return None
if secs < 0:
return None
return secs
# Pretty agent names for comments (mirrors STAGE_AUTHORS roles).
AGENT_DISPLAY = {
"analyst": "Analyst",
"architect": "Architect",
"developer": "Developer",
"reviewer": "Reviewer",
"tester": "Tester",
"deployer": "Deployer",
}
def _input_total(usage: dict) -> int:
"""FULL input = fresh input + cache-read + cache-creation tokens."""
def _i(k):
try:
return int(usage.get(k) or 0)
except (TypeError, ValueError):
return 0
return _i("input_tokens") + _i("cache_read_tokens") + _i("cache_creation_tokens")
def _cached_total(usage: dict) -> int:
"""Cached portion of the input = cache-read + cache-creation tokens."""
def _i(k):
try:
return int(usage.get(k) or 0)
except (TypeError, ValueError):
return 0
return _i("cache_read_tokens") + _i("cache_creation_tokens")
def fmt_in(usage: dict) -> str:
"""Render the input figure as full total with a cached breakdown.
'8.5M in (8.4M cached)' when there is a cache; '45.2k in' when cached==0.
"""
total = _input_total(usage)
cached = _cached_total(usage)
if cached > 0:
return f"{fmt_tokens(total)} in ({fmt_tokens(cached)} cached)"
return f"{fmt_tokens(total)} in"
def usage_comment(
agent: str,
usage: dict | None,
repo: str | None = None,
branch: str | None = None,
work_item_id: str | None = None,
pr_number=None,
) -> str:
"""DEPRECATED (ORCH-016 / ADR-001 §1): thin wrapper around build_status_comment.
The historical one-line "{icon} Role готов · 8.5M in / 45.8k out · $7.29 + links"
format has been replaced by the unified status-comment format. This wrapper
is kept only so that legacy callers (notably the test suite in
``tests/test_usage.py``) keep working; new code MUST call
``build_status_comment(...)`` directly. There is no ``duration_s`` parameter
here because the old signature did not carry it.
"""
return build_status_comment(
agent,
repo=repo,
branch=branch,
work_item_id=work_item_id,
pr_number=pr_number,
usage=usage,
)
# Per-agent artifact file under docs/work-items/{wid}/ (architect/developer/
# deployer use special handling for ADR dirs, PR links, or staging logs —
# see artifact_links()).
AGENT_ARTIFACT = {
"reviewer": ("Review", "12-review.md"),
"tester": ("Test report", "13-test-report.md"),
"deployer": ("Deploy log", "14-deploy-log.md"),
}
def artifact_links(
agent: str,
repo: str | None,
branch: str | None,
work_item_id: str | None,
pr_number=None,
*,
stage: str | None = None,
worktree_root: str | None = None,
) -> list[str]:
"""HTML <li><a>...</a></li> link fragments for the finishing agent's artifacts.
ORCH-016 (ADR-001 §7) breaking change: this function now emits HTML anchor
fragments to feed straight into the <ul> of build_status_comment(), instead
of the legacy markdown ``[label](url)`` strings. The base URL still prefers
settings.gitea_public_url (falls back to gitea_url) so links remain clickable
from outside the deploy host, exactly like the analyst doc list.
Returned strings are individual ``<li><a href="...">label</a></li>`` items;
the caller wraps them in ``<ul>...</ul>``. Empty list (never raises) when
there is nothing to link or context is missing.
AC-8 graceful behaviour: when ``worktree_root`` is provided, a candidate
whose underlying file does NOT exist in the worktree is dropped silently.
With no worktree (unit-test / minimal context), every applicable link is
emitted without a file-existence probe (matches the legacy artifact_links
semantics; that's what existing tests in tests/test_usage.py exercise).
Per agent (ADR-001 §7, ТЗ §2.4):
developer -> Branch + (open) PR
architect -> ADR directory
reviewer -> 12-review.md
tester -> 13-test-report.md
deployer -> 14-deploy-log.md (deploy) or 15-staging-log.md (deploy-staging)
analyst -> NOT handled here; build_status_comment owns its richer list.
"""
try:
from .config import settings
owner = getattr(settings, "gitea_owner", "admin")
base = (
getattr(settings, "gitea_public_url", "") or getattr(settings, "gitea_url", "")
).rstrip("/")
if not base or not repo:
return []
items: list[str] = []
rel_dir = f"docs/work-items/{work_item_id}" if work_item_id else None
def _file_exists(rel_path: str) -> bool:
if not worktree_root:
return True
try:
import os as _os
return _os.path.isfile(_os.path.join(worktree_root, rel_path))
except Exception:
return True
def _dir_exists(rel_path: str) -> bool:
if not worktree_root:
return True
try:
import os as _os
return _os.path.isdir(_os.path.join(worktree_root, rel_path))
except Exception:
return True
if agent == "developer":
if branch:
items.append(
f'<li><a href="{base}/{owner}/{repo}/src/branch/{branch}">'
f"Branch {branch}</a></li>"
)
if pr_number:
items.append(
f'<li><a href="{base}/{owner}/{repo}/pulls/{pr_number}">'
f"PR #{pr_number}</a></li>"
)
return items
if agent == "architect":
if branch and rel_dir:
adr_rel = f"{rel_dir}/06-adr"
if _dir_exists(adr_rel):
items.append(
f'<li><a href="{base}/{owner}/{repo}/src/branch/{branch}/'
f'{adr_rel}">ADR</a></li>'
)
return items
if agent == "deployer":
# Stage-aware (ORCH-35 + ORCH-016 §2.4): 'deploy-staging' picks the
# staging log; 'deploy' (or unknown) picks the deploy log. Other
# deployer artifacts (smoke output etc.) are out of scope.
if branch and rel_dir:
if (stage or "").strip() == "deploy-staging":
fname, label = "15-staging-log.md", "Staging log"
else:
fname, label = "14-deploy-log.md", "Deploy log"
if _file_exists(f"{rel_dir}/{fname}"):
items.append(
f'<li><a href="{base}/{owner}/{repo}/src/branch/{branch}/'
f'{rel_dir}/{fname}">{label}</a></li>'
)
return items
spec = AGENT_ARTIFACT.get(agent)
if spec and branch and rel_dir:
label, fname = spec
if _file_exists(f"{rel_dir}/{fname}"):
items.append(
f'<li><a href="{base}/{owner}/{repo}/src/branch/{branch}/'
f'{rel_dir}/{fname}">{label}</a></li>'
)
return items
except Exception:
return []
AGENT_ICON = {
"analyst": "\U0001f50d",
"architect": "\U0001f4d0",
"developer": "\U0001f4bb",
"reviewer": "\U0001f50e",
"tester": "\U0001f9ea",
"deployer": "\U0001f680",
}
# ---------------------------------------------------------------------------
# ORCH-016: unified status comment for every agent (analyst..deployer)
# ---------------------------------------------------------------------------
# Per-agent one-line description used in the status comment header (ADR-001 §2).
# Trailing periods are kept to match the literal assertions in AC-1..AC-5.
_AGENT_DESCRIPTIONS = {
"analyst": (
"Подготовил BRD / "
"ТЗ / Acceptance Criteria. "
"Для продвижения "
"переведите задачу "
"в статус Approved. "
"Для отклонения — "
"напишите причину "
"комментом и "
"переведите в Rejected."
),
"architect": (
"Завершил "
"архитектурную "
"проработку. "
"См. ADR ниже."
),
"developer": (
"Завершил "
"разработку. "
"См. PR / branch ниже."
),
"reviewer": (
"Завершил "
"ревью "
"изменений."
),
"tester": (
"Завершил "
"прогон "
"тестов."
),
"deployer": (
"Завершил деплой."
),
}
# Analyst-specific candidate artifact list (label -> filename in docs/work-items/<wid>/).
# Matches the legacy _build_analyst_ready_comment list 1:1 so the BUG-C
# regression test (tests/test_analyst_comment.py) keeps passing under the
# unified format.
_ANALYST_CANDIDATES = [
("Business request", "00-business-request.md"),
("BRD", "01-brd.md"),
("ТЗ (TRZ)", "02-trz.md"),
("Acceptance Criteria", "03-acceptance-criteria.md"),
("Test Plan", "04-test-plan.yaml"),
("UI Test Cases", "04b-ui-test-cases.md"),
]
def _read_verdict_line(
agent: str, stage: str | None, worktree_root: str | None, work_item_id: str | None
) -> str | None:
"""Render the optional Verdict / Status line for reviewer / tester / deployer.
Sources (machine-readable YAML frontmatter, via src/frontmatter.py):
reviewer -> 12-review.md verdict: -> 'Verdict: <VALUE>'
tester -> 13-test-report.md verdict: (or status:) -> 'Verdict: <VALUE>'
deployer -> deploy-staging -> 15-staging-log.md staging_status: -> 'Status: <VALUE>'
else (deploy) -> 14-deploy-log.md deploy_status: -> 'Status: <VALUE>'
Returns None (line suppressed) for analyst / architect / developer, when
the worktree is unknown, the work-item id is missing, the artifact file is
absent, or the relevant frontmatter key is not present. Never raises.
"""
if agent not in ("reviewer", "tester", "deployer"):
return None
if not worktree_root or not work_item_id:
return None
try:
import os as _os
from .frontmatter import read_frontmatter_value
base_dir = _os.path.join(worktree_root, "docs/work-items", work_item_id)
except Exception:
return None
if agent == "reviewer":
v = read_frontmatter_value(_os.path.join(base_dir, "12-review.md"), "verdict")
return f"Verdict: {v}" if v else None
if agent == "tester":
path = _os.path.join(base_dir, "13-test-report.md")
v = read_frontmatter_value(path, "verdict")
if not v:
v = read_frontmatter_value(path, "status")
return f"Verdict: {v}" if v else None
# deployer
if (stage or "").strip() == "deploy-staging":
v = read_frontmatter_value(
_os.path.join(base_dir, "15-staging-log.md"), "staging_status"
)
else:
v = read_frontmatter_value(
_os.path.join(base_dir, "14-deploy-log.md"), "deploy_status"
)
return f"Status: {v}" if v else None
def _analyst_doc_items(
repo: str, branch: str, work_item_id: str, worktree_root: str | None
) -> list[str]:
"""Build the analyst's <li><a>...</a></li> list (mirrors legacy behaviour).
Files absent from the worktree are skipped (graceful, as in BUG-C / PR #13).
"""
if not (repo and branch and work_item_id):
return []
from .config import settings as _settings
owner = getattr(_settings, "gitea_owner", "admin")
base = (
getattr(_settings, "gitea_public_url", "") or getattr(_settings, "gitea_url", "")
).rstrip("/")
if not base:
return []
rel_dir = f"docs/work-items/{work_item_id}"
items: list[str] = []
for label, fname in _ANALYST_CANDIDATES:
if worktree_root:
try:
import os as _os
if not _os.path.isfile(_os.path.join(worktree_root, rel_dir, fname)):
continue
except Exception:
# On filesystem error, fall through and link the candidate anyway
# (best-effort) rather than blanking the whole document list.
pass
href = f"{base}/{owner}/{repo}/src/branch/{branch}/{rel_dir}/{fname}"
items.append(f'<li><a href="{href}">{label}</a></li>')
return items
def _usage_tail(usage: dict | None) -> str | None:
"""Render the technical token/cost tail (``<sub>...</sub>``) or None when empty.
Format (ADR-001 §3): ``<sub>{fmt_in} / {out} out · {cost}</sub>``.
Returns None when usage is missing entirely AND all of the relevant
components are zero — i.e. nothing meaningful to print.
"""
if not usage:
return None
in_total = _input_total(usage)
try:
out = int(usage.get("output_tokens") or 0)
except (TypeError, ValueError):
out = 0
try:
cost = float(usage.get("cost_usd") or 0.0)
except (TypeError, ValueError):
cost = 0.0
if in_total == 0 and out == 0 and cost == 0.0:
return None
return f"<sub>{fmt_in(usage)} / {fmt_tokens(out)} out · {fmt_cost(cost)}</sub>"
def build_status_comment(
agent: str,
*,
repo: str | None = None,
branch: str | None = None,
work_item_id: str | None = None,
pr_number=None,
stage: str | None = None,
usage: dict | None = None,
duration_s=None,
task_id=None,
worktree_root: str | None = None,
) -> str:
"""Build the unified per-agent finish comment (ORCH-016 / ADR-001).
Single hot path for every agent's "I just finished a stage" comment in
Plane. Replaces the old ``usage_comment(...)`` one-liner AND the analyst-
special ``stage_engine._build_analyst_ready_comment(...)`` HTML; both now
flow through here. Format (HTML, rendered by Plane), separated by ``<br>``::
{ICON} {RoleName}{DESCRIPTION}
[Verdict|Status: VALUE] # reviewer/tester/deployer + FM
[Длительность: 4m 12s]
<b>Документы:</b><ul><li><a href="...">label</a></li>...</ul>
[<sub>8.5M in / 45.8k out · $7.29</sub>]
Arguments (all keyword-only except ``agent``):
agent one of analyst|architect|developer|reviewer|tester|deployer.
Unknown agents get a generic header — defensive.
repo/branch repository name + feature branch. Required for artifact
links; without them the Документы block is omitted.
work_item_id Plane work-item id used as the docs/work-items/<id>/ slug.
pr_number developer only — appended as a PR link when set.
stage deployer only — 'deploy' vs 'deploy-staging' picks the
log file (14- vs 15-) and the verdict frontmatter key.
usage parsed token/cost dict (from parse_usage_from_text). When
None or all-zero the ``<sub>`` tail is suppressed.
duration_s explicit per-agent wall-clock seconds. If None and
task_id is given, falls back to
get_agent_duration(task_id, agent). Negative / non-int
values are treated as unknown.
task_id tasks.id — required for the DB duration fallback. The
verdict / artifact code paths do NOT depend on it.
worktree_root path to the task's git worktree. Drives AC-8 graceful
skipping of missing files AND the verdict frontmatter
read. Omit (None) in unit tests where only format matters.
The function MUST NOT raise — at worst it returns a degraded one-liner
header, with the exception logged. Self-hosting risk R-1: a crash here
blinds the stakeholder for that very ORCH task.
"""
try:
name = AGENT_DISPLAY.get(agent, (agent or "agent").capitalize())
icon = AGENT_ICON.get(agent, "")
description = _AGENT_DESCRIPTIONS.get(
agent,
"завершил стадию.",
)
if agent == "deployer":
if (stage or "").strip() == "deploy-staging":
description = (
"Завершил "
"staging-деплой."
)
elif (stage or "").strip() == "deploy":
description = (
"Завершил "
"прод-деплой."
)
lines: list[str] = [f"{icon} {name}{description}"]
verdict_line = _read_verdict_line(agent, stage, worktree_root, work_item_id)
if verdict_line:
lines.append(verdict_line)
# Duration: explicit param wins; otherwise DB fallback (ADR-001 §6).
resolved_duration: int | None = None
if duration_s is not None:
try:
if int(duration_s) >= 0:
resolved_duration = int(duration_s)
except (TypeError, ValueError):
resolved_duration = None
if resolved_duration is None and task_id is not None:
resolved_duration = get_agent_duration(task_id, agent)
d_text = fmt_duration(resolved_duration)
if d_text:
lines.append(
"Длительность: "
f"{d_text}"
)
# Documents block (analyst gets its full BRD/TRZ/AC/Test Plan list).
if agent == "analyst":
doc_items = _analyst_doc_items(
repo or "", branch or "", work_item_id or "", worktree_root
)
else:
doc_items = artifact_links(
agent, repo, branch, work_item_id, pr_number,
stage=stage, worktree_root=worktree_root,
)
if doc_items:
lines.append(
"<b>Документы:</b><ul>"
+ "".join(doc_items)
+ "</ul>"
)
tail = _usage_tail(usage)
if tail:
lines.append(tail)
return "<br>".join(lines)
except Exception as e: # defensive — R-1 fallback
logger.exception(f"build_status_comment failed for agent={agent}: {e}")
try:
name = AGENT_DISPLAY.get(agent, str(agent).capitalize())
icon = AGENT_ICON.get(agent, "")
return (
f"{icon} {name} "
"готов"
)
except Exception:
return "✅ Agent готов"
def task_usage_summary(task_id: int) -> dict:
"""Aggregate agent_runs usage for a task.
total_in counts the FULL input (input + cache_read + cache_creation), and
total_cached counts the cached portion (cache_read + cache_creation).
COALESCE(...,0) keeps pre-existing rows (NULL cache_creation) from breaking.
Returns {total_in, total_cached, total_out, total_cost,
per_agent: [(agent, in, cached, out, cost), ...]}.
"""
conn = get_db()
try:
rows = conn.execute(
"SELECT agent, "
"COALESCE(SUM(input_tokens),0) "
" + COALESCE(SUM(cache_read_tokens),0) "
" + COALESCE(SUM(cache_creation_tokens),0), "
"COALESCE(SUM(cache_read_tokens),0) "
" + COALESCE(SUM(cache_creation_tokens),0), "
"COALESCE(SUM(output_tokens),0), "
"COALESCE(SUM(cost_usd),0.0) "
"FROM agent_runs WHERE task_id=? GROUP BY agent ORDER BY agent",
(task_id,),
).fetchall()
finally:
conn.close()
per_agent = [(r[0], int(r[1]), int(r[2]), int(r[3]), float(r[4])) for r in rows]
total_in = sum(r[1] for r in per_agent)
total_cached = sum(r[2] for r in per_agent)
total_out = sum(r[3] for r in per_agent)
total_cost = sum(r[4] for r in per_agent)
return {
"total_in": total_in,
"total_cached": total_cached,
"total_out": total_out,
"total_cost": total_cost,
"per_agent": per_agent,
}
def task_summary_comment(task_id: int) -> str:
"""Build the Deployer end-of-task summary comment (Feature 4, variant B)."""
s = task_usage_summary(task_id)
cached = s.get("total_cached", 0)
head_in = (
f"{fmt_tokens(s['total_in'])} вход ({fmt_tokens(cached)} cached)"
if cached > 0
else f"{fmt_tokens(s['total_in'])} вход"
)
lines = [
f"\U0001f4ca Итого по задаче: "
f"{head_in} / "
f"{fmt_tokens(s['total_out'])} выход · "
f"{fmt_cost(s['total_cost'])}"
]
for agent, ti, tc, to, cost in s["per_agent"]:
name = AGENT_DISPLAY.get(agent, agent.capitalize())
in_str = (
f"{fmt_tokens(ti)} in ({fmt_tokens(tc)} cached)"
if tc > 0
else f"{fmt_tokens(ti)} in"
)
lines.append(
f"{name}: {in_str} / {fmt_tokens(to)} out · {fmt_cost(cost)}"
)
return "\n".join(lines)