From 9a0298de9d76a27cc15d9ba26caaa92a17e13e04 Mon Sep 17 00:00:00 2001 From: dev-bot Date: Thu, 4 Jun 2026 11:42:46 +0300 Subject: [PATCH] feat(telegram): live editable task tracker (Variant B+), replace 15-message spam MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Replace the ~15 separate Telegram messages per task (agent start/finish, stage transition, QG-pending, tech noise) with ONE live tracker message edited in place (editMessageText) on every stage transition. Only attention-worthy events are still sent as SEPARATE, notifying messages: approve-gate, deploy-fail, agent-fail, task error. - db.py: idempotent ALTERs — tasks.tracker_message_id, tasks.title, tasks.brd_review_started_at/ended_at, agent_runs.model. Helpers for tracker message_id + BRD-review clock. - usage.py: short_model_name() (strip provider/claude- prefix); parse model from result-JSON modelUsage; record_usage persists model. - notifications.py: render_task_tracker(task_id) (stateless render from agent_runs), update_task_tracker (sendMessage->store id->editMessageText with fallback to a new message, silent), edit_telegram(). Per-stage line in↓/out↑·cost·model, ⏸️ Ревью БРД (human time), 💰 totals, finish block (⏱️ wall/agents/yours, 🔗 PR · 📦). notify_* are now tracker-only/log-only except the four alerts. - stage_engine.py: stamp brd_review_ended on analysis->architecture advance. - webhooks/plane.py: persist task title on creation. - tests/test_telegram_tracker.py: render, short_model_name, send/edit/fallback, separate-vs-silent alert behavior. --- src/db.py | 84 +++++++ src/notifications.py | 433 ++++++++++++++++++++++++++++++--- src/stage_engine.py | 9 + src/usage.py | 55 ++++- src/webhooks/plane.py | 5 +- tests/test_telegram_tracker.py | 342 ++++++++++++++++++++++++++ 6 files changed, 893 insertions(+), 35 deletions(-) create mode 100644 tests/test_telegram_tracker.py diff --git a/src/db.py b/src/db.py index 4d64a68..e9c2260 100644 --- a/src/db.py +++ b/src/db.py @@ -90,6 +90,25 @@ def init_db(): # and the "X in" figure understates the true prompt size. Idempotent ALTER. _ensure_column(conn, "agent_runs", "cache_creation_tokens", "INTEGER") _ensure_column(conn, "agent_runs", "cost_usd", "REAL") + # Telegram live tracker (feat/telegram-live-tracker): persist the FULL model + # name (e.g. "tokenator/claude-opus-4-8") per agent_runs row so the tracker + # can render a short model tag per stage. Parsed from the run-log result JSON + # (modelUsage key) by the launcher monitor; NULL when unknown. Idempotent ALTER. + _ensure_column(conn, "agent_runs", "model", "TEXT") + # Telegram live tracker: one editable Telegram message per task. We store its + # message_id so each stage transition can editMessageText the same message + # instead of spamming a new one. Idempotent ALTER (safe on the live prod DB). + _ensure_column(conn, "tasks", "tracker_message_id", "INTEGER") + # Telegram live tracker: human-readable task title for the tracker header + # ("🛠️ ET-012 · "). Populated from the Plane work-item name at task + # creation; falls back to the work_item_id when absent. Idempotent ALTER. + _ensure_column(conn, "tasks", "title", "TEXT") + # Telegram live tracker: "BRD review" is the only HUMAN gate time — the delta + # between "BRD ready / approve requested" and the analysis->architecture + # advance (human flipped Plane to Approved). Persisted on the task so the + # tracker can show "твоё время" without recomputing from activity history. + _ensure_column(conn, "tasks", "brd_review_started_at", "TEXT") + _ensure_column(conn, "tasks", "brd_review_ended_at", "TEXT") conn.commit() conn.close() @@ -137,6 +156,71 @@ def update_task_stage(task_id: int, stage: str): conn.close() +# --------------------------------------------------------------------------- +# Telegram live tracker helpers (feat/telegram-live-tracker) +# --------------------------------------------------------------------------- + +def get_tracker_message_id(task_id: int) -> int | None: + """Return the stored Telegram tracker message_id for a task, or None.""" + conn = get_db() + try: + row = conn.execute( + "SELECT tracker_message_id FROM tasks WHERE id=?", (task_id,) + ).fetchone() + finally: + conn.close() + return row[0] if row and row[0] is not None else None + + +def set_tracker_message_id(task_id: int, message_id: int) -> None: + """Persist the Telegram tracker message_id for a task (idempotent overwrite).""" + conn = get_db() + try: + conn.execute( + "UPDATE tasks SET tracker_message_id=? WHERE id=?", + (message_id, task_id), + ) + conn.commit() + finally: + conn.close() + + +def mark_brd_review_started(task_id: int) -> None: + """Stamp when BRD review (the human approve gate) started, if not already set. + + Idempotent: only sets it the first time (a retried analyst run must not reset + the clock). The delta to brd_review_ended_at is the only "твоё время". + """ + conn = get_db() + try: + conn.execute( + "UPDATE tasks SET brd_review_started_at=datetime('now') " + "WHERE id=? AND brd_review_started_at IS NULL", + (task_id,), + ) + conn.commit() + finally: + conn.close() + + +def mark_brd_review_ended(task_id: int) -> None: + """Stamp when BRD review ended (analysis->architecture advance / Approved). + + Idempotent: only sets it the first time and only if a start exists. + """ + conn = get_db() + try: + conn.execute( + "UPDATE tasks SET brd_review_ended_at=datetime('now') " + "WHERE id=? AND brd_review_started_at IS NOT NULL " + "AND brd_review_ended_at IS NULL", + (task_id,), + ) + conn.commit() + finally: + conn.close() + + def get_next_work_item_id(repo: str, prefix: str = "ET") -> str: """Generate next work item ID (e.g., ET-003 / ORCH-001). diff --git a/src/notifications.py b/src/notifications.py index ef885b3..a3af905 100644 --- a/src/notifications.py +++ b/src/notifications.py @@ -1,6 +1,24 @@ -"""Notifications and logging for orchestrator events.""" +"""Notifications and logging for orchestrator events. +feat/telegram-live-tracker (Variant B+): instead of ~15 separate Telegram +messages per task (agent start / finish / stage transition / QG-pending / tech +noise), the orchestrator now maintains ONE live tracker message per task that is +edited in place (editMessageText) on every stage transition. Only events that +NEED Slava's attention are sent as SEPARATE, notifying messages: + + * approve-gate (notify_approve_requested) — BRD/TZ/AC ready, flip to Approved + * deploy failed / rolled back — send_telegram from launcher/engine + * agent failed (exit_code != 0) — send_telegram from launcher + * task error (notify_error) + +The tracker itself is edited SILENTLY (disable_notification: true). Stage-change, +agent-start, agent-finish and QG-pending no longer emit their own messages — they +just refresh the tracker (or are log-only). +""" + +import html import logging + import httpx logger = logging.getLogger("orchestrator") @@ -17,25 +35,65 @@ def _get_settings(): return _settings -def send_telegram(text: str): - """Send notification to Telegram. Fire-and-forget, never raises.""" +# --------------------------------------------------------------------------- # +# Low-level Telegram primitives +# --------------------------------------------------------------------------- # + +def send_telegram(text: str, disable_notification: bool = False): + """Send a notification to Telegram. Fire-and-forget, never raises. + + Returns the Telegram message_id on success, else None (so callers that want + to track the message — the tracker — can store it; legacy callers ignore it). + """ s = _get_settings() if not s.telegram_bot_token or not s.telegram_chat_id: - return + return None try: url = f"https://api.telegram.org/bot{s.telegram_bot_token}/sendMessage" - httpx.post( + resp = httpx.post( url, json={ "chat_id": s.telegram_chat_id, "text": text, "parse_mode": "HTML", - "disable_notification": False, + "disable_notification": disable_notification, }, timeout=5, ) + data = resp.json() + if data.get("ok"): + return data["result"]["message_id"] except Exception: pass # Never crash orchestrator due to notification failure + return None + + +def edit_telegram(message_id: int, text: str) -> bool: + """Edit an existing Telegram message. Returns True on success, else False. + + Used by the live tracker to refresh the single per-task message in place. + Never raises. A False return tells the caller to fall back to a new message + (e.g. the message is too old to edit / was deleted / 400). + """ + s = _get_settings() + if not s.telegram_bot_token or not s.telegram_chat_id: + return False + try: + url = f"https://api.telegram.org/bot{s.telegram_bot_token}/editMessageText" + resp = httpx.post( + url, + json={ + "chat_id": s.telegram_chat_id, + "message_id": message_id, + "text": text, + "parse_mode": "HTML", + }, + timeout=5, + ) + data = resp.json() + return bool(data.get("ok")) + except Exception: + return False def _get_work_item_id(task_id: int) -> str: @@ -50,26 +108,318 @@ def _get_work_item_id(task_id: int) -> str: return f"task-{task_id}" +# --------------------------------------------------------------------------- # +# Live task tracker +# --------------------------------------------------------------------------- # + +# Pipeline stages shown in the tracker, in order, with their display label and +# the agent whose agent_runs rows describe that stage's work. "Ревью БРД" is NOT +# an agent stage — it is the human approve gate rendered between Analysis and +# Architecture from the task's brd_review_* timestamps. +_TRACKER_STAGES = [ + ("analysis", "Analysis", "analyst"), + ("architecture", "Architecture", "architect"), + ("development", "Development", "developer"), + ("review", "Review", "reviewer"), + ("testing", "Testing", "tester"), + ("deploy", "Deploy", "deployer"), +] + +# Map a pipeline stage -> the agent that is RUNNING while the task sits in it. +# (development is entered after architecture finishes, etc.) Used to render the +# "🔄 <Stage> … идёт" line for the currently-active stage. +_BRD_LABEL = "\u0420\u0435\u0432\u044c\u044e \u0411\u0420\u0414" # "Ревью БРД" + +_STAGE_ACTIVE_AGENT = { + "analysis": "analyst", + "architecture": "architect", + "development": "developer", + "review": "reviewer", + "testing": "tester", + "deploy": "deployer", +} + + +def _fmt_minutes(seconds) -> str: + """Render a duration in whole minutes: 0..59s -> '<1м', else '<n>м'.""" + try: + seconds = int(seconds or 0) + except (TypeError, ValueError): + seconds = 0 + if seconds <= 0: + return "0м" + if seconds < 60: + return "<1м" + return f"{seconds // 60}\u043c" + + +def _parse_sql_ts(ts): + """Parse a SQLite 'YYYY-MM-DD HH:MM:SS' UTC timestamp -> aware datetime/None.""" + if not ts: + return None + from datetime import datetime, timezone + for fmt in ("%Y-%m-%d %H:%M:%S", "%Y-%m-%dT%H:%M:%S"): + try: + return datetime.strptime(str(ts)[:19], fmt).replace(tzinfo=timezone.utc) + except (ValueError, TypeError): + continue + return None + + +def _duration_seconds(started, finished): + """Seconds between two SQL timestamps; None if either is missing/unparseable.""" + a = _parse_sql_ts(started) + b = _parse_sql_ts(finished) + if a is None or b is None: + return None + return max(int((b - a).total_seconds()), 0) + + +def render_task_tracker(task_id: int) -> str: + """Build the full live-tracker text for a task from the DB (stateless render). + + Pulls the task header (work_item_id, title, stage), every agent_runs row, and + the BRD-review timestamps, then renders: + - one '✅ <Stage> <dur> · <in>↓/<out>↑ · <cost> · <model>' line per finished + stage (latest run per stage), + - the '⏸️ Ревью БРД <dur> · твоё время[ ⏳]' line between Analysis/Architecture, + - a '🔄 <Stage> … идёт' line for the active (in-progress) stage, + - the '💰 <in>↓ / <out>↑ · <cost>' totals, + - on done: '⏱️ Всего .. · агенты .. · твоё ..' and a '🔗 PR / 📦' line. + + Never raises (returns a minimal fallback string on error). + """ + from .db import get_db + from .usage import fmt_tokens, fmt_cost, _input_total, short_model_name + + try: + conn = get_db() + task = conn.execute( + "SELECT id, work_item_id, title, stage, created_at, updated_at, " + "brd_review_started_at, brd_review_ended_at " + "FROM tasks WHERE id=?", + (task_id,), + ).fetchone() + if not task: + conn.close() + return f"task-{task_id}" + runs = conn.execute( + "SELECT agent, started_at, finished_at, exit_code, input_tokens, " + "output_tokens, cache_read_tokens, cache_creation_tokens, cost_usd, model " + "FROM agent_runs WHERE task_id=? ORDER BY id ASC", + (task_id,), + ).fetchall() + conn.close() + except Exception as e: + logger.warning(f"render_task_tracker({task_id}) DB error: {e}") + return f"task-{task_id}" + + work_item_id = task["work_item_id"] or f"task-{task_id}" + title = task["title"] or work_item_id + stage = task["stage"] or "created" + done = stage == "done" + + # Latest completed run per agent (a stage may have multiple runs on retry; + # we show the most recent FINISHED, successful run for the stage line). + last_done = {} + agent_runs_by_agent = {} + for r in runs: + agent_runs_by_agent.setdefault(r["agent"], []).append(r) + if r["finished_at"] and (r["exit_code"] == 0 or r["exit_code"] is None): + last_done[r["agent"]] = r + + # Totals across ALL runs (every input/output token + cost counts). + total_in = 0 + total_out = 0 + total_cost = 0.0 + agent_seconds = 0 + for r in runs: + usage = { + "input_tokens": r["input_tokens"], + "cache_read_tokens": r["cache_read_tokens"], + "cache_creation_tokens": r["cache_creation_tokens"], + } + total_in += _input_total(usage) + total_out += int(r["output_tokens"] or 0) + total_cost += float(r["cost_usd"] or 0.0) + d = _duration_seconds(r["started_at"], r["finished_at"]) + if d is not None: + agent_seconds += d + + esc_title = html.escape(title) + header = ( + f"\U0001f389 {html.escape(work_item_id)} \u00b7 {esc_title} \u2014 \u0413\u041e\u0422\u041e\u0412\u041e" + if done + else f"\U0001f6e0\ufe0f {html.escape(work_item_id)} \u00b7 {esc_title}" + ) + bar = "\u2501" * 22 + lines = [header, bar] + + def _stage_line(label, run): + usage = { + "input_tokens": run["input_tokens"], + "cache_read_tokens": run["cache_read_tokens"], + "cache_creation_tokens": run["cache_creation_tokens"], + } + in_tok = fmt_tokens(_input_total(usage)) + out_tok = fmt_tokens(run["output_tokens"]) + cost = fmt_cost(run["cost_usd"]) + dur = _fmt_minutes(_duration_seconds(run["started_at"], run["finished_at"])) + model = short_model_name(run["model"]) + model_suffix = f" \u00b7 {model}" if model else "" + return ( + f"\u2705 {label:<13} {dur} \u00b7 " + f"{in_tok}\u2193/{out_tok}\u2191 \u00b7 {cost}{model_suffix}" + ) + + # BRD review line: between Analysis and Architecture, only once Analysis has + # produced a run (i.e. the gate is live). Time = human review delta. + brd_started = task["brd_review_started_at"] + brd_ended = task["brd_review_ended_at"] + review_seconds = _duration_seconds(brd_started, brd_ended) + + for stage_key, label, agent in _TRACKER_STAGES: + run = last_done.get(agent) + if run is not None: + lines.append(_stage_line(label, run)) + elif _STAGE_ACTIVE_AGENT.get(stage) == agent and stage == stage_key: + # This stage is the active one and has no finished run yet. + lines.append(f"\U0001f504 {label:<13} \u2026 \u00b7 \u0438\u0434\u0451\u0442") + # else: not started yet -> not shown. + + # Insert the BRD review line right after Analysis. + if stage_key == "analysis" and brd_started: + brd_label = f"{_BRD_LABEL:<13}" + if review_seconds is not None: + dur = _fmt_minutes(review_seconds) + lines.append( + f"\u23f8\ufe0f {brd_label} {dur} \u00b7 \u0442\u0432\u043e\u0451 \u0432\u0440\u0435\u043c\u044f" + ) + else: + # Still waiting on the human (ended not stamped yet). + from datetime import datetime, timezone + start_dt = _parse_sql_ts(brd_started) + waited = None + if start_dt is not None: + waited = int( + (datetime.now(timezone.utc) - start_dt).total_seconds() + ) + dur = _fmt_minutes(waited) if waited is not None else "\u2026" + lines.append( + f"\u23f8\ufe0f {brd_label} {dur} \u00b7 \u0442\u0432\u043e\u0451 \u0432\u0440\u0435\u043c\u044f \u23f3" + ) + + lines.append(bar) + lines.append( + f"\U0001f4b0 {fmt_tokens(total_in)}\u2193 / {fmt_tokens(total_out)}\u2191 \u00b7 " + f"{fmt_cost(total_cost)}" + ) + + if done: + wall = _duration_seconds(task["created_at"], task["updated_at"]) + wall_str = _fmt_minutes(wall) if wall is not None else "?" + review_str = _fmt_minutes(review_seconds) if review_seconds else "0м" + lines.append( + f"\u23f1\ufe0f \u0412\u0441\u0435\u0433\u043e {wall_str} \u00b7 " + f"\u0430\u0433\u0435\u043d\u0442\u044b {_fmt_minutes(agent_seconds)} \u00b7 " + f"\u0442\u0432\u043e\u0451 {review_str}" + ) + link = _done_link(task_id, task["work_item_id"]) + if link: + lines.append(link) + + return "\n".join(lines) + + +def _done_link(task_id: int, work_item_id) -> str | None: + """Build the final '🔗 PR #n · 📦 deployed' line. Never raises -> None.""" + try: + from .config import settings + from .db import get_db + conn = get_db() + row = conn.execute( + "SELECT repo, branch FROM tasks WHERE id=?", (task_id,) + ).fetchone() + conn.close() + if not row: + return None + repo, branch = row["repo"], row["branch"] + pr_part = None + try: + owner = settings.gitea_owner + headers = {"Authorization": f"token {settings.gitea_token}"} + resp = httpx.get( + f"{settings.gitea_url}/api/v1/repos/{owner}/{repo}/pulls", + params={"state": "all", "head": branch}, + headers=headers, timeout=5, + ) + if resp.status_code == 200: + prs = resp.json() + if prs: + pr_part = f"\U0001f517 PR #{prs[0].get('number')}" + except Exception: + pr_part = None + parts = [] + if pr_part: + parts.append(pr_part) + parts.append("\U0001f4e6 deployed") + return " \u00b7 ".join(parts) + except Exception: + return None + + +def update_task_tracker(task_id: int): + """Render + push the live tracker for a task. Never raises. + + First call (no stored tracker_message_id): sendMessage (silent) and store the + returned message_id. Subsequent calls: editMessageText the stored message; if + the edit fails (too old / deleted / 400), fall back to a NEW message and + update the stored id. The tracker is always sent with disable_notification so + it never pings — only the dedicated alert helpers ping. + """ + try: + from .db import get_tracker_message_id, set_tracker_message_id + text = render_task_tracker(task_id) + mid = get_tracker_message_id(task_id) + if mid is not None: + if edit_telegram(mid, text): + return + # Edit failed -> fall back to a fresh message. + new_mid = send_telegram(text, disable_notification=True) + if new_mid is not None: + set_tracker_message_id(task_id, new_mid) + except Exception as e: + logger.warning(f"update_task_tracker({task_id}) failed: {e}") + + +# --------------------------------------------------------------------------- # +# Stage / agent lifecycle notifications (now tracker-only, no separate message) +# --------------------------------------------------------------------------- # + def notify_stage_change(task_id: int, old_stage: str, new_stage: str, agent: str = None): - """Log and notify stage transition.""" + """Log a stage transition and refresh the live tracker (no separate message).""" work_item_id = _get_work_item_id(task_id) msg = f"\U0001f504 {work_item_id}: {old_stage} \u2192 {new_stage}" if agent: msg += f" (\u0437\u0430\u043f\u0443\u0449\u0435\u043d {agent})" logger.info(msg) - send_telegram(msg) + update_task_tracker(task_id) def notify_agent_started(run_id: int, agent: str, task_id: int): - """Notify agent launch.""" + """Log an agent launch and refresh the tracker (no separate message).""" work_item_id = _get_work_item_id(task_id) - msg = f"\U0001f680 {work_item_id}: {agent} \u0437\u0430\u043f\u0443\u0449\u0435\u043d (run_id={run_id})" - logger.info(msg) - send_telegram(msg) + logger.info(f"\U0001f680 {work_item_id}: {agent} \u0437\u0430\u043f\u0443\u0449\u0435\u043d (run_id={run_id})") + if task_id: + update_task_tracker(task_id) def notify_agent_finished(run_id: int, agent: str, exit_code: int, task_id: int = None, duration_s: int = None): - """Notify agent completion.""" + """Log agent completion and refresh the tracker (no separate message). + + The agent-FAILED alert (exit_code != 0) is still sent separately by the + launcher via send_telegram; this helper itself only logs + refreshes. + """ work_item_id = _get_work_item_id(task_id) if task_id else "?" if exit_code == 0: dur = f" ({duration_s // 60} \u043c\u0438\u043d)" if duration_s else "" @@ -79,47 +429,66 @@ def notify_agent_finished(run_id: int, agent: str, exit_code: int, task_id: int else: msg = f"\u274c {work_item_id}: {agent} \u0443\u043f\u0430\u043b (exit_code={exit_code})" logger.info(msg) - send_telegram(msg) + if task_id: + update_task_tracker(task_id) def notify_qg_result(task_id: int, check: str, passed: bool, reason: str = None): - """Notify QG check result.""" + """Log a QG check result (NO separate Telegram message: QG-pending is noise). + + Kept for callers; QG outcomes are log-only now and reflected by the tracker + through the resulting stage transition. + """ work_item_id = _get_work_item_id(task_id) if passed: - msg = f"\u2705 {work_item_id}: QG {check} \u2014 passed" + logger.info(f"\u2705 {work_item_id}: QG {check} \u2014 passed") else: - msg = f"\u26a0\ufe0f {work_item_id}: QG {check} \u2014 failed: {reason}" - logger.info(msg) - send_telegram(msg) + logger.warning(f"\u26a0\ufe0f {work_item_id}: QG {check} \u2014 failed: {reason}") def notify_qg_failure(task_id: int, stage: str, check: str, reason: str): - """Log and notify QG check failure.""" + """Log a QG check failure (log-only). + + QG-pending / QG-failed are NOT pinged as separate messages anymore (they are + not actionable for Slava). Real rollbacks/deploy-fails are alerted by their + own dedicated send_telegram calls in the engine/launcher. + """ work_item_id = _get_work_item_id(task_id) - msg = f"\u26a0\ufe0f {work_item_id}: QG {check} \u2014 failed: {reason}" - logger.warning(msg) - send_telegram(msg) + logger.warning(f"\u26a0\ufe0f {work_item_id}: QG {check} \u2014 failed: {reason}") def notify_approve_requested(task_id: int): - """Notify that analyst requests :approved:.""" + """ALERT (separate, notifying): BRD/TZ/AC ready -> flip Plane to Approved. + + Also starts the BRD-review clock and refreshes the tracker so the + '⏸️ Ревью БРД · твоё время ⏳' line appears. + """ work_item_id = _get_work_item_id(task_id) - msg = f"\U0001f4cb {work_item_id}: BRD/\u0422\u0417/AC \u0433\u043e\u0442\u043e\u0432\u044b. \u0416\u0434\u0443 :approved: \u0432 Plane" + try: + from .db import mark_brd_review_started + mark_brd_review_started(task_id) + except Exception as e: + logger.warning(f"notify_approve_requested: brd clock start failed: {e}") + msg = ( + f"\U0001f4cb {work_item_id}: BRD/\u0422\u0417/AC \u0433\u043e\u0442\u043e\u0432\u044b. " + f"\u041f\u0435\u0440\u0435\u0432\u0435\u0434\u0438\u0442\u0435 \u0437\u0430\u0434\u0430\u0447\u0443 \u0432 \u0441\u0442\u0430\u0442\u0443\u0441 Approved " + f"\u0432 Plane \u0434\u043b\u044f \u043f\u0440\u043e\u0434\u043e\u043b\u0436\u0435\u043d\u0438\u044f." + ) logger.info(msg) - send_telegram(msg) + update_task_tracker(task_id) + send_telegram(msg) # separate, notifying def notify_done(task_id: int): - """Notify task completion.""" + """Task completion: refresh the tracker to its final ГОТОВО form (no separate ping).""" work_item_id = _get_work_item_id(task_id) - msg = f"\U0001f389 {work_item_id}: \u0437\u0430\u0434\u0430\u0447\u0430 \u0437\u0430\u0432\u0435\u0440\u0448\u0435\u043d\u0430!" - logger.info(msg) - send_telegram(msg) + logger.info(f"\U0001f389 {work_item_id}: \u0437\u0430\u0434\u0430\u0447\u0430 \u0437\u0430\u0432\u0435\u0440\u0448\u0435\u043d\u0430!") + update_task_tracker(task_id) def notify_error(task_id: int, error: str): - """Log and notify error for a task.""" + """ALERT (separate, notifying): task error.""" work_item_id = _get_work_item_id(task_id) if task_id else "system" msg = f"\U0001f534 {work_item_id}: ERROR \u2014 {error}" logger.error(msg) - send_telegram(msg) + send_telegram(msg) # separate, notifying diff --git a/src/stage_engine.py b/src/stage_engine.py index 0fe4284..7d564b7 100644 --- a/src/stage_engine.py +++ b/src/stage_engine.py @@ -240,6 +240,15 @@ def advance_stage( # --- Advance --------------------------------------------------------- update_task_stage(task_id, next_stage) + # Telegram live tracker: the analysis->architecture advance is the human + # Approved gate clearing -> stamp the END of "Ревью БРД" (the only + # human time). Idempotent: only the first stamp counts. + if current_stage == "analysis" and next_stage == "architecture": + try: + from .db import mark_brd_review_ended + mark_brd_review_ended(task_id) + except Exception as e: + logger.warning(f"Task {task_id}: brd review end stamp failed: {e}") notify_stage_change(task_id, current_stage, next_stage) plane_notify_stage(work_item_id, current_stage, next_stage) result.advanced = True diff --git a/src/usage.py b/src/usage.py index ce37ce0..96bd25a 100644 --- a/src/usage.py +++ b/src/usage.py @@ -79,9 +79,60 @@ def parse_usage_from_text(text: str) -> dict | None: usage.get("cache_creation_input_tokens", usage.get("cache_creation_tokens")) ), "cost_usd": _float(cost), + # Telegram live tracker: the model the run actually used. claude + # --output-format json reports it under modelUsage (a dict keyed by the + # full model id) and/or a top-level "model" field. We keep the FULL name + # here; short_model_name() trims it for the tracker. None when unknown. + "model": _extract_model(candidate), } +def _extract_model(candidate: dict) -> str | None: + """Best-effort: pull the model id out of a claude result JSON object. + + Prefers modelUsage (a dict keyed by full model ids, e.g. + {"claude-opus-4-8": {...}}) and returns the key with the most output + tokens; falls back to a top-level "model" string. Never raises -> None. + """ + try: + mu = candidate.get("modelUsage") + if isinstance(mu, dict) and mu: + def _out(v): + try: + return int((v or {}).get("outputTokens", 0)) + except (TypeError, ValueError, AttributeError): + return 0 + best = max(mu.items(), key=lambda kv: _out(kv[1])) + if best and best[0]: + return str(best[0]) + model = candidate.get("model") + if isinstance(model, str) and model: + return model + except Exception: + pass + return None + + +def short_model_name(full: str | None) -> str: + """Trim a full model id to a short tag for the tracker. + + 'tokenator/claude-opus-4-8' -> 'opus-4-8' + 'vibecode/claude-sonnet-4.6' -> 'sonnet-4.6' + 'claude-opus-4-8' -> 'opus-4-8' + Returns '' when full is falsy so callers can omit the ' · <model>' suffix. + """ + if not full: + return "" + name = str(full).strip() + # Drop any provider prefix up to and including the last '/'. + if "/" in name: + name = name.rsplit("/", 1)[-1] + # Drop a leading 'claude-' marketing prefix. + if name.startswith("claude-"): + name = name[len("claude-"):] + return name + + def _extract_last_json_object(text: str) -> dict | None: """Return the last balanced top-level JSON object in `text` that parses. @@ -157,13 +208,15 @@ def record_usage(run_id: int, usage: dict | None): try: conn.execute( "UPDATE agent_runs SET input_tokens=?, output_tokens=?, " - "cache_read_tokens=?, cache_creation_tokens=?, cost_usd=? WHERE id=?", + "cache_read_tokens=?, cache_creation_tokens=?, cost_usd=?, " + "model=COALESCE(?, model) WHERE id=?", ( usage.get("input_tokens"), usage.get("output_tokens"), usage.get("cache_read_tokens"), usage.get("cache_creation_tokens"), usage.get("cost_usd"), + usage.get("model"), run_id, ), ) diff --git a/src/webhooks/plane.py b/src/webhooks/plane.py index 1eb0f6b..cdcc2e7 100644 --- a/src/webhooks/plane.py +++ b/src/webhooks/plane.py @@ -494,8 +494,9 @@ async def start_pipeline(data: dict, project_id: str = ""): # Insert task into DB conn = get_db() conn.execute( - "INSERT INTO tasks (plane_id, work_item_id, repo, branch, stage, plane_issue_id) VALUES (?, ?, ?, ?, ?, ?)", - (plane_id, work_item_id, repo, branch, "analysis", plane_id), + "INSERT INTO tasks (plane_id, work_item_id, repo, branch, stage, plane_issue_id, title) " + "VALUES (?, ?, ?, ?, ?, ?, ?)", + (plane_id, work_item_id, repo, branch, "analysis", plane_id, name), ) conn.commit() conn.close() diff --git a/tests/test_telegram_tracker.py b/tests/test_telegram_tracker.py new file mode 100644 index 0000000..d563fa3 --- /dev/null +++ b/tests/test_telegram_tracker.py @@ -0,0 +1,342 @@ +"""feat/telegram-live-tracker: tests for the live Telegram task tracker. + +Covers (per DEV_TASK_TELEGRAM_TRACKER.md): + * short_model_name: provider/claude- prefix trimming. + * render_task_tracker: per-stage line format (in↓/out↑, model, cost, minutes), + the "⏸️ Ревью БРД · твоё время" line, the 💰 totals, and the finish block + (⏱️ three times + 🔗/📦). + * first message -> sendMessage stores message_id; transition -> editMessageText. + * fallback: editMessageText fails -> a NEW message is sent and the id updated. + * which alerts go out SEPARATELY (approve-gate / deploy-fail / agent-fail / + error) vs which do NOT (QG-pending / agent-start / stage-transition). + +Isolated temp DB; no network (httpx is patched). +""" + +import os +import tempfile + +os.environ.setdefault("ORCH_PLANE_API_TOKEN", "test-token") +os.environ.setdefault("ORCH_GITEA_TOKEN", "test-token") + +_test_db = os.path.join(tempfile.gettempdir(), "test_orchestrator_tracker.db") +os.environ["ORCH_DB_PATH"] = _test_db + +from unittest.mock import MagicMock, patch # noqa: E402 + +import pytest # noqa: E402 + +import src.db as db_module # noqa: E402 +from src.db import init_db, get_db # noqa: E402 +from src import notifications as N # noqa: E402 +from src import usage as U # noqa: E402 + + +@pytest.fixture(autouse=True) +def setup_db(monkeypatch): + monkeypatch.setattr(db_module.settings, "db_path", _test_db, raising=False) + if os.path.exists(_test_db): + os.unlink(_test_db) + init_db() + # Re-enable send_telegram (conftest stubs it to a no-op); these tests patch + # httpx / the lower-level helpers explicitly per case. + yield + if os.path.exists(_test_db): + os.unlink(_test_db) + + +# --------------------------------------------------------------------------- # +# helpers to build a task + runs in the DB +# --------------------------------------------------------------------------- # +def _mk_task(stage="development", title="\u0422\u0440\u0435\u043a\u0438 \u0441 \u0437\u0443\u043c\u0430 z5", + wid="ET-012", brd_start=None, brd_end=None): + conn = get_db() + cur = conn.execute( + "INSERT INTO tasks (plane_id, work_item_id, repo, branch, stage, title, " + "brd_review_started_at, brd_review_ended_at) " + "VALUES (?, ?, ?, ?, ?, ?, ?, ?)", + ("p1", wid, "enduro-trails", "feature/ET-012-x", stage, title, + brd_start, brd_end), + ) + tid = cur.lastrowid + conn.commit() + conn.close() + return tid + + +def _mk_run(task_id, agent, started, finished, in_tok, out_tok, + cache_read=0, cache_creation=0, cost=0.0, model=None, exit_code=0): + conn = get_db() + cur = conn.execute( + "INSERT INTO agent_runs (task_id, agent, started_at, finished_at, " + "exit_code, input_tokens, output_tokens, cache_read_tokens, " + "cache_creation_tokens, cost_usd, model) " + "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", + (task_id, agent, started, finished, exit_code, in_tok, out_tok, + cache_read, cache_creation, cost, model), + ) + rid = cur.lastrowid + conn.commit() + conn.close() + return rid + + +# --------------------------------------------------------------------------- # +# short_model_name +# --------------------------------------------------------------------------- # +def test_short_model_name(): + assert U.short_model_name("tokenator/claude-opus-4-8") == "opus-4-8" + assert U.short_model_name("vibecode/claude-sonnet-4.6") == "sonnet-4.6" + assert U.short_model_name("claude-opus-4-8") == "opus-4-8" + assert U.short_model_name("opus-4-8") == "opus-4-8" + assert U.short_model_name(None) == "" + assert U.short_model_name("") == "" + + +def test_parse_usage_extracts_model_from_modelusage(): + blob = ( + '{"total_cost_usd":0.01,' + '"usage":{"input_tokens":10,"output_tokens":5},' + '"modelUsage":{"claude-opus-4-8":{"inputTokens":10,"outputTokens":5}}}' + ) + u = U.parse_usage_from_text(blob) + assert u["model"] == "claude-opus-4-8" + + +# --------------------------------------------------------------------------- # +# render_task_tracker +# --------------------------------------------------------------------------- # +def test_render_in_progress_stage_lines_and_totals(): + tid = _mk_task(stage="deploy", brd_start="2026-06-04 10:00:00", + brd_end="2026-06-04 10:08:00") + # Analysis: 10м, 1.1M in (mostly cache) / 39.6k out, $2.38, opus-4-8 + _mk_run(tid, "analyst", "2026-06-04 09:00:00", "2026-06-04 09:10:00", + in_tok=1000, out_tok=39600, cache_read=1_100_000, cost=2.38, + model="tokenator/claude-opus-4-8") + _mk_run(tid, "architect", "2026-06-04 10:08:00", "2026-06-04 10:17:00", + in_tok=500, out_tok=34400, cache_read=1_500_000, cost=2.24, + model="tokenator/claude-opus-4-8") + _mk_run(tid, "developer", "2026-06-04 10:17:00", "2026-06-04 10:28:00", + in_tok=400, out_tok=45800, cache_read=8_400_000, cost=7.29, + model="tokenator/claude-opus-4-8") + _mk_run(tid, "reviewer", "2026-06-04 10:28:00", "2026-06-04 10:31:00", + in_tok=300, out_tok=12900, cache_read=1_200_000, cost=1.53, + model="vibecode/claude-sonnet-4.6") + _mk_run(tid, "tester", "2026-06-04 10:31:00", "2026-06-04 10:36:00", + in_tok=200, out_tok=19500, cache_read=1_200_000, cost=1.51, + model="vibecode/claude-sonnet-4.6") + # deployer started but not finished -> active "идёт" line. + _mk_run(tid, "deployer", "2026-06-04 10:36:00", None, + in_tok=0, out_tok=0, model=None, exit_code=None) + + text = N.render_task_tracker(tid) + + # Header in-progress + assert text.startswith("\U0001f6e0\ufe0f ET-012 \u00b7 \u0422\u0440\u0435\u043a\u0438") + # Per-stage format: in↓/out↑ · cost · model + assert "\u2705 Analysis" in text + assert "10\u043c" in text # analysis duration + assert "39.6k\u2191" in text # analysis out + assert "$2.38" in text + assert "opus-4-8" in text + assert "sonnet-4.6" in text # reviewer/tester model + # BRD review line (human time, ended) + assert "\u0420\u0435\u0432\u044c\u044e \u0411\u0420\u0414" in text + assert "\u0442\u0432\u043e\u0451 \u0432\u0440\u0435\u043c\u044f" in text + # Active stage + assert "\U0001f504 Deploy" in text + assert "\u0438\u0434\u0451\u0442" in text + # Totals line present with 💰 + assert "\U0001f4b0" in text + # In-progress: no final ⏱️ line + assert "\u0412\u0441\u0435\u0433\u043e" not in text + + +def test_render_brd_review_waiting_shows_hourglass(): + tid = _mk_task(stage="analysis", brd_start="2026-06-04 10:00:00", + brd_end=None) + _mk_run(tid, "analyst", "2026-06-04 09:00:00", "2026-06-04 09:10:00", + in_tok=1000, out_tok=39600, cache_read=1_100_000, cost=2.38, + model="tokenator/claude-opus-4-8") + text = N.render_task_tracker(tid) + assert "\u0420\u0435\u0432\u044c\u044e \u0411\u0420\u0414" in text + assert "\u23f3" in text # hourglass while waiting + + +def test_render_done_has_times_and_links(): + tid = _mk_task(stage="done", brd_start="2026-06-04 10:00:00", + brd_end="2026-06-04 10:08:00") + # set created/updated to compute wall clock + conn = get_db() + conn.execute( + "UPDATE tasks SET created_at='2026-06-04 09:00:00', " + "updated_at='2026-06-04 09:56:00' WHERE id=?", (tid,)) + conn.commit() + conn.close() + _mk_run(tid, "analyst", "2026-06-04 09:00:00", "2026-06-04 09:10:00", + in_tok=1000, out_tok=39600, cache_read=1_100_000, cost=2.38, + model="tokenator/claude-opus-4-8") + _mk_run(tid, "deployer", "2026-06-04 09:50:00", "2026-06-04 09:56:00", + in_tok=400, out_tok=22400, cache_read=1_600_000, cost=1.73, + model="tokenator/claude-opus-4-8") + + with patch("src.notifications.httpx") as _hx: + # No PR found -> just "📦 deployed" + _resp = MagicMock(status_code=200) + _resp.json.return_value = [] + _hx.get.return_value = _resp + text = N.render_task_tracker(tid) + + assert text.startswith("\U0001f389 ET-012") + assert "\u0413\u041e\u0422\u041e\u0412\u041e" in text + # ⏱️ with three times + assert "\u23f1\ufe0f" in text + assert "\u0412\u0441\u0435\u0433\u043e" in text + assert "\u0430\u0433\u0435\u043d\u0442\u044b" in text + assert "\u0442\u0432\u043e\u0451" in text + # 📦 deployed line + assert "\U0001f4e6" in text + + +def test_render_escapes_html_in_title(): + tid = _mk_task(stage="analysis", title="A <b>& B</b>") + _mk_run(tid, "analyst", "2026-06-04 09:00:00", "2026-06-04 09:10:00", + in_tok=10, out_tok=5, cost=0.0) + text = N.render_task_tracker(tid) + assert "<b>" in text + assert "&" in text + + +def test_render_omits_model_when_unknown(): + tid = _mk_task(stage="analysis") + _mk_run(tid, "analyst", "2026-06-04 09:00:00", "2026-06-04 09:10:00", + in_tok=10, out_tok=5, cost=0.0, model=None) + text = N.render_task_tracker(tid) + # No trailing " · <model>" — line ends at cost. + line = [l for l in text.splitlines() if l.startswith("\u2705 Analysis")][0] + assert line.rstrip().endswith("$0.00") + + +# --------------------------------------------------------------------------- # +# tracker send / edit / fallback +# --------------------------------------------------------------------------- # +def test_first_call_sends_message_and_stores_id(monkeypatch): + tid = _mk_task(stage="analysis") + _mk_run(tid, "analyst", "2026-06-04 09:00:00", None, in_tok=0, out_tok=0, + exit_code=None) + + sent = {} + def _fake_send(text, disable_notification=False): + sent["text"] = text + sent["silent"] = disable_notification + return 555 + monkeypatch.setattr(N, "send_telegram", _fake_send) + monkeypatch.setattr(N, "edit_telegram", lambda *a, **k: (_ for _ in ()).throw(AssertionError("should not edit on first call"))) + + N.update_task_tracker(tid) + + from src.db import get_tracker_message_id + assert get_tracker_message_id(tid) == 555 + assert sent["silent"] is True # tracker is silent + + +def test_second_call_edits_existing_message(monkeypatch): + tid = _mk_task(stage="development") + _mk_run(tid, "analyst", "2026-06-04 09:00:00", "2026-06-04 09:10:00", + in_tok=10, out_tok=5, cost=0.1) + from src.db import set_tracker_message_id + set_tracker_message_id(tid, 777) + + edited = {} + monkeypatch.setattr(N, "edit_telegram", + lambda mid, text: edited.update(mid=mid) or True) + monkeypatch.setattr(N, "send_telegram", + lambda *a, **k: (_ for _ in ()).throw(AssertionError("should not send when edit succeeds"))) + + N.update_task_tracker(tid) + assert edited["mid"] == 777 + + +def test_fallback_to_new_message_when_edit_fails(monkeypatch): + tid = _mk_task(stage="development") + _mk_run(tid, "analyst", "2026-06-04 09:00:00", "2026-06-04 09:10:00", + in_tok=10, out_tok=5, cost=0.1) + from src.db import set_tracker_message_id, get_tracker_message_id + set_tracker_message_id(tid, 100) + + monkeypatch.setattr(N, "edit_telegram", lambda mid, text: False) # edit fails + monkeypatch.setattr(N, "send_telegram", lambda text, disable_notification=False: 200) + + N.update_task_tracker(tid) + assert get_tracker_message_id(tid) == 200 # id updated to the new message + + +# --------------------------------------------------------------------------- # +# which alerts are SEPARATE vs tracker-only +# --------------------------------------------------------------------------- # +def test_approve_gate_sends_separate_message_and_starts_brd_clock(monkeypatch): + tid = _mk_task(stage="analysis") + calls = [] + monkeypatch.setattr(N, "send_telegram", + lambda text, disable_notification=False: calls.append((text, disable_notification)) or 1) + monkeypatch.setattr(N, "update_task_tracker", lambda task_id: None) + + N.notify_approve_requested(tid) + + # exactly one SEPARATE (notifying) send for the approve gate + assert len(calls) == 1 + assert calls[0][1] is False # notifying + assert "Approved" in calls[0][0] + # BRD clock started + conn = get_db() + row = conn.execute("SELECT brd_review_started_at FROM tasks WHERE id=?", (tid,)).fetchone() + conn.close() + assert row[0] is not None + + +def test_error_sends_separate_message(monkeypatch): + tid = _mk_task(stage="development") + calls = [] + monkeypatch.setattr(N, "send_telegram", + lambda text, disable_notification=False: calls.append((text, disable_notification)) or 1) + N.notify_error(tid, "boom") + assert len(calls) == 1 + assert calls[0][1] is False # notifying + assert "ERROR" in calls[0][0] + + +def test_stage_change_does_not_send_separate_message(monkeypatch): + tid = _mk_task(stage="development") + sent = [] + monkeypatch.setattr(N, "send_telegram", + lambda text, disable_notification=False: sent.append(text) or 1) + # tracker refresh is allowed (edit/send silent) but must NOT use send_telegram + # for a separate notification; stub update to isolate. + refreshed = [] + monkeypatch.setattr(N, "update_task_tracker", lambda task_id: refreshed.append(task_id)) + + N.notify_stage_change(tid, "development", "review") + assert sent == [] # no separate message + assert refreshed == [tid] # tracker refreshed instead + + +def test_agent_started_does_not_send_separate_message(monkeypatch): + tid = _mk_task(stage="analysis") + sent = [] + monkeypatch.setattr(N, "send_telegram", + lambda text, disable_notification=False: sent.append(text) or 1) + refreshed = [] + monkeypatch.setattr(N, "update_task_tracker", lambda task_id: refreshed.append(task_id)) + + N.notify_agent_started(1, "analyst", tid) + assert sent == [] + assert refreshed == [tid] + + +def test_qg_failure_does_not_send_separate_message(monkeypatch): + tid = _mk_task(stage="development") + sent = [] + monkeypatch.setattr(N, "send_telegram", + lambda text, disable_notification=False: sent.append(text) or 1) + N.notify_qg_failure(tid, "development", "check_ci_green", "CI state: pending") + assert sent == [] # QG-pending is log-only, never a separate ping -- 2.49.1