feat(telegram): live editable task tracker (Variant B+), replace 15-message spam
Replace the ~15 separate Telegram messages per task (agent start/finish, stage transition, QG-pending, tech noise) with ONE live tracker message edited in place (editMessageText) on every stage transition. Only attention-worthy events are still sent as SEPARATE, notifying messages: approve-gate, deploy-fail, agent-fail, task error. - db.py: idempotent ALTERs — tasks.tracker_message_id, tasks.title, tasks.brd_review_started_at/ended_at, agent_runs.model. Helpers for tracker message_id + BRD-review clock. - usage.py: short_model_name() (strip provider/claude- prefix); parse model from result-JSON modelUsage; record_usage persists model. - notifications.py: render_task_tracker(task_id) (stateless render from agent_runs), update_task_tracker (sendMessage->store id->editMessageText with fallback to a new message, silent), edit_telegram(). Per-stage line in↓/out↑·cost·model, ⏸️ Ревью БРД (human time), 💰 totals, finish block (⏱️ wall/agents/yours, 🔗 PR · 📦). notify_* are now tracker-only/log-only except the four alerts. - stage_engine.py: stamp brd_review_ended on analysis->architecture advance. - webhooks/plane.py: persist task title on creation. - tests/test_telegram_tracker.py: render, short_model_name, send/edit/fallback, separate-vs-silent alert behavior.
This commit is contained in:
84
src/db.py
84
src/db.py
@@ -90,6 +90,25 @@ def init_db():
|
||||
# and the "X in" figure understates the true prompt size. Idempotent ALTER.
|
||||
_ensure_column(conn, "agent_runs", "cache_creation_tokens", "INTEGER")
|
||||
_ensure_column(conn, "agent_runs", "cost_usd", "REAL")
|
||||
# Telegram live tracker (feat/telegram-live-tracker): persist the FULL model
|
||||
# name (e.g. "tokenator/claude-opus-4-8") per agent_runs row so the tracker
|
||||
# can render a short model tag per stage. Parsed from the run-log result JSON
|
||||
# (modelUsage key) by the launcher monitor; NULL when unknown. Idempotent ALTER.
|
||||
_ensure_column(conn, "agent_runs", "model", "TEXT")
|
||||
# Telegram live tracker: one editable Telegram message per task. We store its
|
||||
# message_id so each stage transition can editMessageText the same message
|
||||
# instead of spamming a new one. Idempotent ALTER (safe on the live prod DB).
|
||||
_ensure_column(conn, "tasks", "tracker_message_id", "INTEGER")
|
||||
# Telegram live tracker: human-readable task title for the tracker header
|
||||
# ("🛠️ ET-012 · <title>"). Populated from the Plane work-item name at task
|
||||
# creation; falls back to the work_item_id when absent. Idempotent ALTER.
|
||||
_ensure_column(conn, "tasks", "title", "TEXT")
|
||||
# Telegram live tracker: "BRD review" is the only HUMAN gate time — the delta
|
||||
# between "BRD ready / approve requested" and the analysis->architecture
|
||||
# advance (human flipped Plane to Approved). Persisted on the task so the
|
||||
# tracker can show "твоё время" without recomputing from activity history.
|
||||
_ensure_column(conn, "tasks", "brd_review_started_at", "TEXT")
|
||||
_ensure_column(conn, "tasks", "brd_review_ended_at", "TEXT")
|
||||
conn.commit()
|
||||
conn.close()
|
||||
|
||||
@@ -137,6 +156,71 @@ def update_task_stage(task_id: int, stage: str):
|
||||
conn.close()
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Telegram live tracker helpers (feat/telegram-live-tracker)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def get_tracker_message_id(task_id: int) -> int | None:
|
||||
"""Return the stored Telegram tracker message_id for a task, or None."""
|
||||
conn = get_db()
|
||||
try:
|
||||
row = conn.execute(
|
||||
"SELECT tracker_message_id FROM tasks WHERE id=?", (task_id,)
|
||||
).fetchone()
|
||||
finally:
|
||||
conn.close()
|
||||
return row[0] if row and row[0] is not None else None
|
||||
|
||||
|
||||
def set_tracker_message_id(task_id: int, message_id: int) -> None:
|
||||
"""Persist the Telegram tracker message_id for a task (idempotent overwrite)."""
|
||||
conn = get_db()
|
||||
try:
|
||||
conn.execute(
|
||||
"UPDATE tasks SET tracker_message_id=? WHERE id=?",
|
||||
(message_id, task_id),
|
||||
)
|
||||
conn.commit()
|
||||
finally:
|
||||
conn.close()
|
||||
|
||||
|
||||
def mark_brd_review_started(task_id: int) -> None:
|
||||
"""Stamp when BRD review (the human approve gate) started, if not already set.
|
||||
|
||||
Idempotent: only sets it the first time (a retried analyst run must not reset
|
||||
the clock). The delta to brd_review_ended_at is the only "твоё время".
|
||||
"""
|
||||
conn = get_db()
|
||||
try:
|
||||
conn.execute(
|
||||
"UPDATE tasks SET brd_review_started_at=datetime('now') "
|
||||
"WHERE id=? AND brd_review_started_at IS NULL",
|
||||
(task_id,),
|
||||
)
|
||||
conn.commit()
|
||||
finally:
|
||||
conn.close()
|
||||
|
||||
|
||||
def mark_brd_review_ended(task_id: int) -> None:
|
||||
"""Stamp when BRD review ended (analysis->architecture advance / Approved).
|
||||
|
||||
Idempotent: only sets it the first time and only if a start exists.
|
||||
"""
|
||||
conn = get_db()
|
||||
try:
|
||||
conn.execute(
|
||||
"UPDATE tasks SET brd_review_ended_at=datetime('now') "
|
||||
"WHERE id=? AND brd_review_started_at IS NOT NULL "
|
||||
"AND brd_review_ended_at IS NULL",
|
||||
(task_id,),
|
||||
)
|
||||
conn.commit()
|
||||
finally:
|
||||
conn.close()
|
||||
|
||||
|
||||
def get_next_work_item_id(repo: str, prefix: str = "ET") -> str:
|
||||
"""Generate next work item ID (e.g., ET-003 / ORCH-001).
|
||||
|
||||
|
||||
@@ -1,6 +1,24 @@
|
||||
"""Notifications and logging for orchestrator events."""
|
||||
"""Notifications and logging for orchestrator events.
|
||||
|
||||
feat/telegram-live-tracker (Variant B+): instead of ~15 separate Telegram
|
||||
messages per task (agent start / finish / stage transition / QG-pending / tech
|
||||
noise), the orchestrator now maintains ONE live tracker message per task that is
|
||||
edited in place (editMessageText) on every stage transition. Only events that
|
||||
NEED Slava's attention are sent as SEPARATE, notifying messages:
|
||||
|
||||
* approve-gate (notify_approve_requested) — BRD/TZ/AC ready, flip to Approved
|
||||
* deploy failed / rolled back — send_telegram from launcher/engine
|
||||
* agent failed (exit_code != 0) — send_telegram from launcher
|
||||
* task error (notify_error)
|
||||
|
||||
The tracker itself is edited SILENTLY (disable_notification: true). Stage-change,
|
||||
agent-start, agent-finish and QG-pending no longer emit their own messages — they
|
||||
just refresh the tracker (or are log-only).
|
||||
"""
|
||||
|
||||
import html
|
||||
import logging
|
||||
|
||||
import httpx
|
||||
|
||||
logger = logging.getLogger("orchestrator")
|
||||
@@ -17,25 +35,65 @@ def _get_settings():
|
||||
return _settings
|
||||
|
||||
|
||||
def send_telegram(text: str):
|
||||
"""Send notification to Telegram. Fire-and-forget, never raises."""
|
||||
# --------------------------------------------------------------------------- #
|
||||
# Low-level Telegram primitives
|
||||
# --------------------------------------------------------------------------- #
|
||||
|
||||
def send_telegram(text: str, disable_notification: bool = False):
|
||||
"""Send a notification to Telegram. Fire-and-forget, never raises.
|
||||
|
||||
Returns the Telegram message_id on success, else None (so callers that want
|
||||
to track the message — the tracker — can store it; legacy callers ignore it).
|
||||
"""
|
||||
s = _get_settings()
|
||||
if not s.telegram_bot_token or not s.telegram_chat_id:
|
||||
return
|
||||
return None
|
||||
try:
|
||||
url = f"https://api.telegram.org/bot{s.telegram_bot_token}/sendMessage"
|
||||
httpx.post(
|
||||
resp = httpx.post(
|
||||
url,
|
||||
json={
|
||||
"chat_id": s.telegram_chat_id,
|
||||
"text": text,
|
||||
"parse_mode": "HTML",
|
||||
"disable_notification": False,
|
||||
"disable_notification": disable_notification,
|
||||
},
|
||||
timeout=5,
|
||||
)
|
||||
data = resp.json()
|
||||
if data.get("ok"):
|
||||
return data["result"]["message_id"]
|
||||
except Exception:
|
||||
pass # Never crash orchestrator due to notification failure
|
||||
return None
|
||||
|
||||
|
||||
def edit_telegram(message_id: int, text: str) -> bool:
|
||||
"""Edit an existing Telegram message. Returns True on success, else False.
|
||||
|
||||
Used by the live tracker to refresh the single per-task message in place.
|
||||
Never raises. A False return tells the caller to fall back to a new message
|
||||
(e.g. the message is too old to edit / was deleted / 400).
|
||||
"""
|
||||
s = _get_settings()
|
||||
if not s.telegram_bot_token or not s.telegram_chat_id:
|
||||
return False
|
||||
try:
|
||||
url = f"https://api.telegram.org/bot{s.telegram_bot_token}/editMessageText"
|
||||
resp = httpx.post(
|
||||
url,
|
||||
json={
|
||||
"chat_id": s.telegram_chat_id,
|
||||
"message_id": message_id,
|
||||
"text": text,
|
||||
"parse_mode": "HTML",
|
||||
},
|
||||
timeout=5,
|
||||
)
|
||||
data = resp.json()
|
||||
return bool(data.get("ok"))
|
||||
except Exception:
|
||||
return False
|
||||
|
||||
|
||||
def _get_work_item_id(task_id: int) -> str:
|
||||
@@ -50,26 +108,318 @@ def _get_work_item_id(task_id: int) -> str:
|
||||
return f"task-{task_id}"
|
||||
|
||||
|
||||
# --------------------------------------------------------------------------- #
|
||||
# Live task tracker
|
||||
# --------------------------------------------------------------------------- #
|
||||
|
||||
# Pipeline stages shown in the tracker, in order, with their display label and
|
||||
# the agent whose agent_runs rows describe that stage's work. "Ревью БРД" is NOT
|
||||
# an agent stage — it is the human approve gate rendered between Analysis and
|
||||
# Architecture from the task's brd_review_* timestamps.
|
||||
_TRACKER_STAGES = [
|
||||
("analysis", "Analysis", "analyst"),
|
||||
("architecture", "Architecture", "architect"),
|
||||
("development", "Development", "developer"),
|
||||
("review", "Review", "reviewer"),
|
||||
("testing", "Testing", "tester"),
|
||||
("deploy", "Deploy", "deployer"),
|
||||
]
|
||||
|
||||
# Map a pipeline stage -> the agent that is RUNNING while the task sits in it.
|
||||
# (development is entered after architecture finishes, etc.) Used to render the
|
||||
# "🔄 <Stage> … идёт" line for the currently-active stage.
|
||||
_BRD_LABEL = "\u0420\u0435\u0432\u044c\u044e \u0411\u0420\u0414" # "Ревью БРД"
|
||||
|
||||
_STAGE_ACTIVE_AGENT = {
|
||||
"analysis": "analyst",
|
||||
"architecture": "architect",
|
||||
"development": "developer",
|
||||
"review": "reviewer",
|
||||
"testing": "tester",
|
||||
"deploy": "deployer",
|
||||
}
|
||||
|
||||
|
||||
def _fmt_minutes(seconds) -> str:
|
||||
"""Render a duration in whole minutes: 0..59s -> '<1м', else '<n>м'."""
|
||||
try:
|
||||
seconds = int(seconds or 0)
|
||||
except (TypeError, ValueError):
|
||||
seconds = 0
|
||||
if seconds <= 0:
|
||||
return "0м"
|
||||
if seconds < 60:
|
||||
return "<1м"
|
||||
return f"{seconds // 60}\u043c"
|
||||
|
||||
|
||||
def _parse_sql_ts(ts):
|
||||
"""Parse a SQLite 'YYYY-MM-DD HH:MM:SS' UTC timestamp -> aware datetime/None."""
|
||||
if not ts:
|
||||
return None
|
||||
from datetime import datetime, timezone
|
||||
for fmt in ("%Y-%m-%d %H:%M:%S", "%Y-%m-%dT%H:%M:%S"):
|
||||
try:
|
||||
return datetime.strptime(str(ts)[:19], fmt).replace(tzinfo=timezone.utc)
|
||||
except (ValueError, TypeError):
|
||||
continue
|
||||
return None
|
||||
|
||||
|
||||
def _duration_seconds(started, finished):
|
||||
"""Seconds between two SQL timestamps; None if either is missing/unparseable."""
|
||||
a = _parse_sql_ts(started)
|
||||
b = _parse_sql_ts(finished)
|
||||
if a is None or b is None:
|
||||
return None
|
||||
return max(int((b - a).total_seconds()), 0)
|
||||
|
||||
|
||||
def render_task_tracker(task_id: int) -> str:
|
||||
"""Build the full live-tracker text for a task from the DB (stateless render).
|
||||
|
||||
Pulls the task header (work_item_id, title, stage), every agent_runs row, and
|
||||
the BRD-review timestamps, then renders:
|
||||
- one '✅ <Stage> <dur> · <in>↓/<out>↑ · <cost> · <model>' line per finished
|
||||
stage (latest run per stage),
|
||||
- the '⏸️ Ревью БРД <dur> · твоё время[ ⏳]' line between Analysis/Architecture,
|
||||
- a '🔄 <Stage> … идёт' line for the active (in-progress) stage,
|
||||
- the '💰 <in>↓ / <out>↑ · <cost>' totals,
|
||||
- on done: '⏱️ Всего .. · агенты .. · твоё ..' and a '🔗 PR / 📦' line.
|
||||
|
||||
Never raises (returns a minimal fallback string on error).
|
||||
"""
|
||||
from .db import get_db
|
||||
from .usage import fmt_tokens, fmt_cost, _input_total, short_model_name
|
||||
|
||||
try:
|
||||
conn = get_db()
|
||||
task = conn.execute(
|
||||
"SELECT id, work_item_id, title, stage, created_at, updated_at, "
|
||||
"brd_review_started_at, brd_review_ended_at "
|
||||
"FROM tasks WHERE id=?",
|
||||
(task_id,),
|
||||
).fetchone()
|
||||
if not task:
|
||||
conn.close()
|
||||
return f"task-{task_id}"
|
||||
runs = conn.execute(
|
||||
"SELECT agent, started_at, finished_at, exit_code, input_tokens, "
|
||||
"output_tokens, cache_read_tokens, cache_creation_tokens, cost_usd, model "
|
||||
"FROM agent_runs WHERE task_id=? ORDER BY id ASC",
|
||||
(task_id,),
|
||||
).fetchall()
|
||||
conn.close()
|
||||
except Exception as e:
|
||||
logger.warning(f"render_task_tracker({task_id}) DB error: {e}")
|
||||
return f"task-{task_id}"
|
||||
|
||||
work_item_id = task["work_item_id"] or f"task-{task_id}"
|
||||
title = task["title"] or work_item_id
|
||||
stage = task["stage"] or "created"
|
||||
done = stage == "done"
|
||||
|
||||
# Latest completed run per agent (a stage may have multiple runs on retry;
|
||||
# we show the most recent FINISHED, successful run for the stage line).
|
||||
last_done = {}
|
||||
agent_runs_by_agent = {}
|
||||
for r in runs:
|
||||
agent_runs_by_agent.setdefault(r["agent"], []).append(r)
|
||||
if r["finished_at"] and (r["exit_code"] == 0 or r["exit_code"] is None):
|
||||
last_done[r["agent"]] = r
|
||||
|
||||
# Totals across ALL runs (every input/output token + cost counts).
|
||||
total_in = 0
|
||||
total_out = 0
|
||||
total_cost = 0.0
|
||||
agent_seconds = 0
|
||||
for r in runs:
|
||||
usage = {
|
||||
"input_tokens": r["input_tokens"],
|
||||
"cache_read_tokens": r["cache_read_tokens"],
|
||||
"cache_creation_tokens": r["cache_creation_tokens"],
|
||||
}
|
||||
total_in += _input_total(usage)
|
||||
total_out += int(r["output_tokens"] or 0)
|
||||
total_cost += float(r["cost_usd"] or 0.0)
|
||||
d = _duration_seconds(r["started_at"], r["finished_at"])
|
||||
if d is not None:
|
||||
agent_seconds += d
|
||||
|
||||
esc_title = html.escape(title)
|
||||
header = (
|
||||
f"\U0001f389 {html.escape(work_item_id)} \u00b7 {esc_title} \u2014 \u0413\u041e\u0422\u041e\u0412\u041e"
|
||||
if done
|
||||
else f"\U0001f6e0\ufe0f {html.escape(work_item_id)} \u00b7 {esc_title}"
|
||||
)
|
||||
bar = "\u2501" * 22
|
||||
lines = [header, bar]
|
||||
|
||||
def _stage_line(label, run):
|
||||
usage = {
|
||||
"input_tokens": run["input_tokens"],
|
||||
"cache_read_tokens": run["cache_read_tokens"],
|
||||
"cache_creation_tokens": run["cache_creation_tokens"],
|
||||
}
|
||||
in_tok = fmt_tokens(_input_total(usage))
|
||||
out_tok = fmt_tokens(run["output_tokens"])
|
||||
cost = fmt_cost(run["cost_usd"])
|
||||
dur = _fmt_minutes(_duration_seconds(run["started_at"], run["finished_at"]))
|
||||
model = short_model_name(run["model"])
|
||||
model_suffix = f" \u00b7 {model}" if model else ""
|
||||
return (
|
||||
f"\u2705 {label:<13} {dur} \u00b7 "
|
||||
f"{in_tok}\u2193/{out_tok}\u2191 \u00b7 {cost}{model_suffix}"
|
||||
)
|
||||
|
||||
# BRD review line: between Analysis and Architecture, only once Analysis has
|
||||
# produced a run (i.e. the gate is live). Time = human review delta.
|
||||
brd_started = task["brd_review_started_at"]
|
||||
brd_ended = task["brd_review_ended_at"]
|
||||
review_seconds = _duration_seconds(brd_started, brd_ended)
|
||||
|
||||
for stage_key, label, agent in _TRACKER_STAGES:
|
||||
run = last_done.get(agent)
|
||||
if run is not None:
|
||||
lines.append(_stage_line(label, run))
|
||||
elif _STAGE_ACTIVE_AGENT.get(stage) == agent and stage == stage_key:
|
||||
# This stage is the active one and has no finished run yet.
|
||||
lines.append(f"\U0001f504 {label:<13} \u2026 \u00b7 \u0438\u0434\u0451\u0442")
|
||||
# else: not started yet -> not shown.
|
||||
|
||||
# Insert the BRD review line right after Analysis.
|
||||
if stage_key == "analysis" and brd_started:
|
||||
brd_label = f"{_BRD_LABEL:<13}"
|
||||
if review_seconds is not None:
|
||||
dur = _fmt_minutes(review_seconds)
|
||||
lines.append(
|
||||
f"\u23f8\ufe0f {brd_label} {dur} \u00b7 \u0442\u0432\u043e\u0451 \u0432\u0440\u0435\u043c\u044f"
|
||||
)
|
||||
else:
|
||||
# Still waiting on the human (ended not stamped yet).
|
||||
from datetime import datetime, timezone
|
||||
start_dt = _parse_sql_ts(brd_started)
|
||||
waited = None
|
||||
if start_dt is not None:
|
||||
waited = int(
|
||||
(datetime.now(timezone.utc) - start_dt).total_seconds()
|
||||
)
|
||||
dur = _fmt_minutes(waited) if waited is not None else "\u2026"
|
||||
lines.append(
|
||||
f"\u23f8\ufe0f {brd_label} {dur} \u00b7 \u0442\u0432\u043e\u0451 \u0432\u0440\u0435\u043c\u044f \u23f3"
|
||||
)
|
||||
|
||||
lines.append(bar)
|
||||
lines.append(
|
||||
f"\U0001f4b0 {fmt_tokens(total_in)}\u2193 / {fmt_tokens(total_out)}\u2191 \u00b7 "
|
||||
f"{fmt_cost(total_cost)}"
|
||||
)
|
||||
|
||||
if done:
|
||||
wall = _duration_seconds(task["created_at"], task["updated_at"])
|
||||
wall_str = _fmt_minutes(wall) if wall is not None else "?"
|
||||
review_str = _fmt_minutes(review_seconds) if review_seconds else "0м"
|
||||
lines.append(
|
||||
f"\u23f1\ufe0f \u0412\u0441\u0435\u0433\u043e {wall_str} \u00b7 "
|
||||
f"\u0430\u0433\u0435\u043d\u0442\u044b {_fmt_minutes(agent_seconds)} \u00b7 "
|
||||
f"\u0442\u0432\u043e\u0451 {review_str}"
|
||||
)
|
||||
link = _done_link(task_id, task["work_item_id"])
|
||||
if link:
|
||||
lines.append(link)
|
||||
|
||||
return "\n".join(lines)
|
||||
|
||||
|
||||
def _done_link(task_id: int, work_item_id) -> str | None:
|
||||
"""Build the final '🔗 PR #n · 📦 deployed' line. Never raises -> None."""
|
||||
try:
|
||||
from .config import settings
|
||||
from .db import get_db
|
||||
conn = get_db()
|
||||
row = conn.execute(
|
||||
"SELECT repo, branch FROM tasks WHERE id=?", (task_id,)
|
||||
).fetchone()
|
||||
conn.close()
|
||||
if not row:
|
||||
return None
|
||||
repo, branch = row["repo"], row["branch"]
|
||||
pr_part = None
|
||||
try:
|
||||
owner = settings.gitea_owner
|
||||
headers = {"Authorization": f"token {settings.gitea_token}"}
|
||||
resp = httpx.get(
|
||||
f"{settings.gitea_url}/api/v1/repos/{owner}/{repo}/pulls",
|
||||
params={"state": "all", "head": branch},
|
||||
headers=headers, timeout=5,
|
||||
)
|
||||
if resp.status_code == 200:
|
||||
prs = resp.json()
|
||||
if prs:
|
||||
pr_part = f"\U0001f517 PR #{prs[0].get('number')}"
|
||||
except Exception:
|
||||
pr_part = None
|
||||
parts = []
|
||||
if pr_part:
|
||||
parts.append(pr_part)
|
||||
parts.append("\U0001f4e6 deployed")
|
||||
return " \u00b7 ".join(parts)
|
||||
except Exception:
|
||||
return None
|
||||
|
||||
|
||||
def update_task_tracker(task_id: int):
|
||||
"""Render + push the live tracker for a task. Never raises.
|
||||
|
||||
First call (no stored tracker_message_id): sendMessage (silent) and store the
|
||||
returned message_id. Subsequent calls: editMessageText the stored message; if
|
||||
the edit fails (too old / deleted / 400), fall back to a NEW message and
|
||||
update the stored id. The tracker is always sent with disable_notification so
|
||||
it never pings — only the dedicated alert helpers ping.
|
||||
"""
|
||||
try:
|
||||
from .db import get_tracker_message_id, set_tracker_message_id
|
||||
text = render_task_tracker(task_id)
|
||||
mid = get_tracker_message_id(task_id)
|
||||
if mid is not None:
|
||||
if edit_telegram(mid, text):
|
||||
return
|
||||
# Edit failed -> fall back to a fresh message.
|
||||
new_mid = send_telegram(text, disable_notification=True)
|
||||
if new_mid is not None:
|
||||
set_tracker_message_id(task_id, new_mid)
|
||||
except Exception as e:
|
||||
logger.warning(f"update_task_tracker({task_id}) failed: {e}")
|
||||
|
||||
|
||||
# --------------------------------------------------------------------------- #
|
||||
# Stage / agent lifecycle notifications (now tracker-only, no separate message)
|
||||
# --------------------------------------------------------------------------- #
|
||||
|
||||
def notify_stage_change(task_id: int, old_stage: str, new_stage: str, agent: str = None):
|
||||
"""Log and notify stage transition."""
|
||||
"""Log a stage transition and refresh the live tracker (no separate message)."""
|
||||
work_item_id = _get_work_item_id(task_id)
|
||||
msg = f"\U0001f504 {work_item_id}: {old_stage} \u2192 {new_stage}"
|
||||
if agent:
|
||||
msg += f" (\u0437\u0430\u043f\u0443\u0449\u0435\u043d {agent})"
|
||||
logger.info(msg)
|
||||
send_telegram(msg)
|
||||
update_task_tracker(task_id)
|
||||
|
||||
|
||||
def notify_agent_started(run_id: int, agent: str, task_id: int):
|
||||
"""Notify agent launch."""
|
||||
"""Log an agent launch and refresh the tracker (no separate message)."""
|
||||
work_item_id = _get_work_item_id(task_id)
|
||||
msg = f"\U0001f680 {work_item_id}: {agent} \u0437\u0430\u043f\u0443\u0449\u0435\u043d (run_id={run_id})"
|
||||
logger.info(msg)
|
||||
send_telegram(msg)
|
||||
logger.info(f"\U0001f680 {work_item_id}: {agent} \u0437\u0430\u043f\u0443\u0449\u0435\u043d (run_id={run_id})")
|
||||
if task_id:
|
||||
update_task_tracker(task_id)
|
||||
|
||||
|
||||
def notify_agent_finished(run_id: int, agent: str, exit_code: int, task_id: int = None, duration_s: int = None):
|
||||
"""Notify agent completion."""
|
||||
"""Log agent completion and refresh the tracker (no separate message).
|
||||
|
||||
The agent-FAILED alert (exit_code != 0) is still sent separately by the
|
||||
launcher via send_telegram; this helper itself only logs + refreshes.
|
||||
"""
|
||||
work_item_id = _get_work_item_id(task_id) if task_id else "?"
|
||||
if exit_code == 0:
|
||||
dur = f" ({duration_s // 60} \u043c\u0438\u043d)" if duration_s else ""
|
||||
@@ -79,47 +429,66 @@ def notify_agent_finished(run_id: int, agent: str, exit_code: int, task_id: int
|
||||
else:
|
||||
msg = f"\u274c {work_item_id}: {agent} \u0443\u043f\u0430\u043b (exit_code={exit_code})"
|
||||
logger.info(msg)
|
||||
send_telegram(msg)
|
||||
if task_id:
|
||||
update_task_tracker(task_id)
|
||||
|
||||
|
||||
def notify_qg_result(task_id: int, check: str, passed: bool, reason: str = None):
|
||||
"""Notify QG check result."""
|
||||
"""Log a QG check result (NO separate Telegram message: QG-pending is noise).
|
||||
|
||||
Kept for callers; QG outcomes are log-only now and reflected by the tracker
|
||||
through the resulting stage transition.
|
||||
"""
|
||||
work_item_id = _get_work_item_id(task_id)
|
||||
if passed:
|
||||
msg = f"\u2705 {work_item_id}: QG {check} \u2014 passed"
|
||||
logger.info(f"\u2705 {work_item_id}: QG {check} \u2014 passed")
|
||||
else:
|
||||
msg = f"\u26a0\ufe0f {work_item_id}: QG {check} \u2014 failed: {reason}"
|
||||
logger.info(msg)
|
||||
send_telegram(msg)
|
||||
logger.warning(f"\u26a0\ufe0f {work_item_id}: QG {check} \u2014 failed: {reason}")
|
||||
|
||||
|
||||
def notify_qg_failure(task_id: int, stage: str, check: str, reason: str):
|
||||
"""Log and notify QG check failure."""
|
||||
"""Log a QG check failure (log-only).
|
||||
|
||||
QG-pending / QG-failed are NOT pinged as separate messages anymore (they are
|
||||
not actionable for Slava). Real rollbacks/deploy-fails are alerted by their
|
||||
own dedicated send_telegram calls in the engine/launcher.
|
||||
"""
|
||||
work_item_id = _get_work_item_id(task_id)
|
||||
msg = f"\u26a0\ufe0f {work_item_id}: QG {check} \u2014 failed: {reason}"
|
||||
logger.warning(msg)
|
||||
send_telegram(msg)
|
||||
logger.warning(f"\u26a0\ufe0f {work_item_id}: QG {check} \u2014 failed: {reason}")
|
||||
|
||||
|
||||
def notify_approve_requested(task_id: int):
|
||||
"""Notify that analyst requests :approved:."""
|
||||
"""ALERT (separate, notifying): BRD/TZ/AC ready -> flip Plane to Approved.
|
||||
|
||||
Also starts the BRD-review clock and refreshes the tracker so the
|
||||
'⏸️ Ревью БРД · твоё время ⏳' line appears.
|
||||
"""
|
||||
work_item_id = _get_work_item_id(task_id)
|
||||
msg = f"\U0001f4cb {work_item_id}: BRD/\u0422\u0417/AC \u0433\u043e\u0442\u043e\u0432\u044b. \u0416\u0434\u0443 :approved: \u0432 Plane"
|
||||
try:
|
||||
from .db import mark_brd_review_started
|
||||
mark_brd_review_started(task_id)
|
||||
except Exception as e:
|
||||
logger.warning(f"notify_approve_requested: brd clock start failed: {e}")
|
||||
msg = (
|
||||
f"\U0001f4cb {work_item_id}: BRD/\u0422\u0417/AC \u0433\u043e\u0442\u043e\u0432\u044b. "
|
||||
f"\u041f\u0435\u0440\u0435\u0432\u0435\u0434\u0438\u0442\u0435 \u0437\u0430\u0434\u0430\u0447\u0443 \u0432 \u0441\u0442\u0430\u0442\u0443\u0441 Approved "
|
||||
f"\u0432 Plane \u0434\u043b\u044f \u043f\u0440\u043e\u0434\u043e\u043b\u0436\u0435\u043d\u0438\u044f."
|
||||
)
|
||||
logger.info(msg)
|
||||
send_telegram(msg)
|
||||
update_task_tracker(task_id)
|
||||
send_telegram(msg) # separate, notifying
|
||||
|
||||
|
||||
def notify_done(task_id: int):
|
||||
"""Notify task completion."""
|
||||
"""Task completion: refresh the tracker to its final ГОТОВО form (no separate ping)."""
|
||||
work_item_id = _get_work_item_id(task_id)
|
||||
msg = f"\U0001f389 {work_item_id}: \u0437\u0430\u0434\u0430\u0447\u0430 \u0437\u0430\u0432\u0435\u0440\u0448\u0435\u043d\u0430!"
|
||||
logger.info(msg)
|
||||
send_telegram(msg)
|
||||
logger.info(f"\U0001f389 {work_item_id}: \u0437\u0430\u0434\u0430\u0447\u0430 \u0437\u0430\u0432\u0435\u0440\u0448\u0435\u043d\u0430!")
|
||||
update_task_tracker(task_id)
|
||||
|
||||
|
||||
def notify_error(task_id: int, error: str):
|
||||
"""Log and notify error for a task."""
|
||||
"""ALERT (separate, notifying): task error."""
|
||||
work_item_id = _get_work_item_id(task_id) if task_id else "system"
|
||||
msg = f"\U0001f534 {work_item_id}: ERROR \u2014 {error}"
|
||||
logger.error(msg)
|
||||
send_telegram(msg)
|
||||
send_telegram(msg) # separate, notifying
|
||||
|
||||
@@ -240,6 +240,15 @@ def advance_stage(
|
||||
|
||||
# --- Advance ---------------------------------------------------------
|
||||
update_task_stage(task_id, next_stage)
|
||||
# Telegram live tracker: the analysis->architecture advance is the human
|
||||
# Approved gate clearing -> stamp the END of "Ревью БРД" (the only
|
||||
# human time). Idempotent: only the first stamp counts.
|
||||
if current_stage == "analysis" and next_stage == "architecture":
|
||||
try:
|
||||
from .db import mark_brd_review_ended
|
||||
mark_brd_review_ended(task_id)
|
||||
except Exception as e:
|
||||
logger.warning(f"Task {task_id}: brd review end stamp failed: {e}")
|
||||
notify_stage_change(task_id, current_stage, next_stage)
|
||||
plane_notify_stage(work_item_id, current_stage, next_stage)
|
||||
result.advanced = True
|
||||
|
||||
55
src/usage.py
55
src/usage.py
@@ -79,9 +79,60 @@ def parse_usage_from_text(text: str) -> dict | None:
|
||||
usage.get("cache_creation_input_tokens", usage.get("cache_creation_tokens"))
|
||||
),
|
||||
"cost_usd": _float(cost),
|
||||
# Telegram live tracker: the model the run actually used. claude
|
||||
# --output-format json reports it under modelUsage (a dict keyed by the
|
||||
# full model id) and/or a top-level "model" field. We keep the FULL name
|
||||
# here; short_model_name() trims it for the tracker. None when unknown.
|
||||
"model": _extract_model(candidate),
|
||||
}
|
||||
|
||||
|
||||
def _extract_model(candidate: dict) -> str | None:
|
||||
"""Best-effort: pull the model id out of a claude result JSON object.
|
||||
|
||||
Prefers modelUsage (a dict keyed by full model ids, e.g.
|
||||
{"claude-opus-4-8": {...}}) and returns the key with the most output
|
||||
tokens; falls back to a top-level "model" string. Never raises -> None.
|
||||
"""
|
||||
try:
|
||||
mu = candidate.get("modelUsage")
|
||||
if isinstance(mu, dict) and mu:
|
||||
def _out(v):
|
||||
try:
|
||||
return int((v or {}).get("outputTokens", 0))
|
||||
except (TypeError, ValueError, AttributeError):
|
||||
return 0
|
||||
best = max(mu.items(), key=lambda kv: _out(kv[1]))
|
||||
if best and best[0]:
|
||||
return str(best[0])
|
||||
model = candidate.get("model")
|
||||
if isinstance(model, str) and model:
|
||||
return model
|
||||
except Exception:
|
||||
pass
|
||||
return None
|
||||
|
||||
|
||||
def short_model_name(full: str | None) -> str:
|
||||
"""Trim a full model id to a short tag for the tracker.
|
||||
|
||||
'tokenator/claude-opus-4-8' -> 'opus-4-8'
|
||||
'vibecode/claude-sonnet-4.6' -> 'sonnet-4.6'
|
||||
'claude-opus-4-8' -> 'opus-4-8'
|
||||
Returns '' when full is falsy so callers can omit the ' · <model>' suffix.
|
||||
"""
|
||||
if not full:
|
||||
return ""
|
||||
name = str(full).strip()
|
||||
# Drop any provider prefix up to and including the last '/'.
|
||||
if "/" in name:
|
||||
name = name.rsplit("/", 1)[-1]
|
||||
# Drop a leading 'claude-' marketing prefix.
|
||||
if name.startswith("claude-"):
|
||||
name = name[len("claude-"):]
|
||||
return name
|
||||
|
||||
|
||||
def _extract_last_json_object(text: str) -> dict | None:
|
||||
"""Return the last balanced top-level JSON object in `text` that parses.
|
||||
|
||||
@@ -157,13 +208,15 @@ def record_usage(run_id: int, usage: dict | None):
|
||||
try:
|
||||
conn.execute(
|
||||
"UPDATE agent_runs SET input_tokens=?, output_tokens=?, "
|
||||
"cache_read_tokens=?, cache_creation_tokens=?, cost_usd=? WHERE id=?",
|
||||
"cache_read_tokens=?, cache_creation_tokens=?, cost_usd=?, "
|
||||
"model=COALESCE(?, model) WHERE id=?",
|
||||
(
|
||||
usage.get("input_tokens"),
|
||||
usage.get("output_tokens"),
|
||||
usage.get("cache_read_tokens"),
|
||||
usage.get("cache_creation_tokens"),
|
||||
usage.get("cost_usd"),
|
||||
usage.get("model"),
|
||||
run_id,
|
||||
),
|
||||
)
|
||||
|
||||
@@ -494,8 +494,9 @@ async def start_pipeline(data: dict, project_id: str = ""):
|
||||
# Insert task into DB
|
||||
conn = get_db()
|
||||
conn.execute(
|
||||
"INSERT INTO tasks (plane_id, work_item_id, repo, branch, stage, plane_issue_id) VALUES (?, ?, ?, ?, ?, ?)",
|
||||
(plane_id, work_item_id, repo, branch, "analysis", plane_id),
|
||||
"INSERT INTO tasks (plane_id, work_item_id, repo, branch, stage, plane_issue_id, title) "
|
||||
"VALUES (?, ?, ?, ?, ?, ?, ?)",
|
||||
(plane_id, work_item_id, repo, branch, "analysis", plane_id, name),
|
||||
)
|
||||
conn.commit()
|
||||
conn.close()
|
||||
|
||||
Reference in New Issue
Block a user