diff --git a/src/db.py b/src/db.py
index 4d64a68..e9c2260 100644
--- a/src/db.py
+++ b/src/db.py
@@ -90,6 +90,25 @@ def init_db():
# and the "X in" figure understates the true prompt size. Idempotent ALTER.
_ensure_column(conn, "agent_runs", "cache_creation_tokens", "INTEGER")
_ensure_column(conn, "agent_runs", "cost_usd", "REAL")
+ # Telegram live tracker (feat/telegram-live-tracker): persist the FULL model
+ # name (e.g. "tokenator/claude-opus-4-8") per agent_runs row so the tracker
+ # can render a short model tag per stage. Parsed from the run-log result JSON
+ # (modelUsage key) by the launcher monitor; NULL when unknown. Idempotent ALTER.
+ _ensure_column(conn, "agent_runs", "model", "TEXT")
+ # Telegram live tracker: one editable Telegram message per task. We store its
+ # message_id so each stage transition can editMessageText the same message
+ # instead of spamming a new one. Idempotent ALTER (safe on the live prod DB).
+ _ensure_column(conn, "tasks", "tracker_message_id", "INTEGER")
+ # Telegram live tracker: human-readable task title for the tracker header
+ # ("🛠️ ET-012 ·
"). Populated from the Plane work-item name at task
+ # creation; falls back to the work_item_id when absent. Idempotent ALTER.
+ _ensure_column(conn, "tasks", "title", "TEXT")
+ # Telegram live tracker: "BRD review" is the only HUMAN gate time — the delta
+ # between "BRD ready / approve requested" and the analysis->architecture
+ # advance (human flipped Plane to Approved). Persisted on the task so the
+ # tracker can show "твоё время" without recomputing from activity history.
+ _ensure_column(conn, "tasks", "brd_review_started_at", "TEXT")
+ _ensure_column(conn, "tasks", "brd_review_ended_at", "TEXT")
conn.commit()
conn.close()
@@ -137,6 +156,71 @@ def update_task_stage(task_id: int, stage: str):
conn.close()
+# ---------------------------------------------------------------------------
+# Telegram live tracker helpers (feat/telegram-live-tracker)
+# ---------------------------------------------------------------------------
+
+def get_tracker_message_id(task_id: int) -> int | None:
+ """Return the stored Telegram tracker message_id for a task, or None."""
+ conn = get_db()
+ try:
+ row = conn.execute(
+ "SELECT tracker_message_id FROM tasks WHERE id=?", (task_id,)
+ ).fetchone()
+ finally:
+ conn.close()
+ return row[0] if row and row[0] is not None else None
+
+
+def set_tracker_message_id(task_id: int, message_id: int) -> None:
+ """Persist the Telegram tracker message_id for a task (idempotent overwrite)."""
+ conn = get_db()
+ try:
+ conn.execute(
+ "UPDATE tasks SET tracker_message_id=? WHERE id=?",
+ (message_id, task_id),
+ )
+ conn.commit()
+ finally:
+ conn.close()
+
+
+def mark_brd_review_started(task_id: int) -> None:
+ """Stamp when BRD review (the human approve gate) started, if not already set.
+
+ Idempotent: only sets it the first time (a retried analyst run must not reset
+ the clock). The delta to brd_review_ended_at is the only "твоё время".
+ """
+ conn = get_db()
+ try:
+ conn.execute(
+ "UPDATE tasks SET brd_review_started_at=datetime('now') "
+ "WHERE id=? AND brd_review_started_at IS NULL",
+ (task_id,),
+ )
+ conn.commit()
+ finally:
+ conn.close()
+
+
+def mark_brd_review_ended(task_id: int) -> None:
+ """Stamp when BRD review ended (analysis->architecture advance / Approved).
+
+ Idempotent: only sets it the first time and only if a start exists.
+ """
+ conn = get_db()
+ try:
+ conn.execute(
+ "UPDATE tasks SET brd_review_ended_at=datetime('now') "
+ "WHERE id=? AND brd_review_started_at IS NOT NULL "
+ "AND brd_review_ended_at IS NULL",
+ (task_id,),
+ )
+ conn.commit()
+ finally:
+ conn.close()
+
+
def get_next_work_item_id(repo: str, prefix: str = "ET") -> str:
"""Generate next work item ID (e.g., ET-003 / ORCH-001).
diff --git a/src/notifications.py b/src/notifications.py
index ef885b3..a3af905 100644
--- a/src/notifications.py
+++ b/src/notifications.py
@@ -1,6 +1,24 @@
-"""Notifications and logging for orchestrator events."""
+"""Notifications and logging for orchestrator events.
+feat/telegram-live-tracker (Variant B+): instead of ~15 separate Telegram
+messages per task (agent start / finish / stage transition / QG-pending / tech
+noise), the orchestrator now maintains ONE live tracker message per task that is
+edited in place (editMessageText) on every stage transition. Only events that
+NEED Slava's attention are sent as SEPARATE, notifying messages:
+
+ * approve-gate (notify_approve_requested) — BRD/TZ/AC ready, flip to Approved
+ * deploy failed / rolled back — send_telegram from launcher/engine
+ * agent failed (exit_code != 0) — send_telegram from launcher
+ * task error (notify_error)
+
+The tracker itself is edited SILENTLY (disable_notification: true). Stage-change,
+agent-start, agent-finish and QG-pending no longer emit their own messages — they
+just refresh the tracker (or are log-only).
+"""
+
+import html
import logging
+
import httpx
logger = logging.getLogger("orchestrator")
@@ -17,25 +35,65 @@ def _get_settings():
return _settings
-def send_telegram(text: str):
- """Send notification to Telegram. Fire-and-forget, never raises."""
+# --------------------------------------------------------------------------- #
+# Low-level Telegram primitives
+# --------------------------------------------------------------------------- #
+
+def send_telegram(text: str, disable_notification: bool = False):
+ """Send a notification to Telegram. Fire-and-forget, never raises.
+
+ Returns the Telegram message_id on success, else None (so callers that want
+ to track the message — the tracker — can store it; legacy callers ignore it).
+ """
s = _get_settings()
if not s.telegram_bot_token or not s.telegram_chat_id:
- return
+ return None
try:
url = f"https://api.telegram.org/bot{s.telegram_bot_token}/sendMessage"
- httpx.post(
+ resp = httpx.post(
url,
json={
"chat_id": s.telegram_chat_id,
"text": text,
"parse_mode": "HTML",
- "disable_notification": False,
+ "disable_notification": disable_notification,
},
timeout=5,
)
+ data = resp.json()
+ if data.get("ok"):
+ return data["result"]["message_id"]
except Exception:
pass # Never crash orchestrator due to notification failure
+ return None
+
+
+def edit_telegram(message_id: int, text: str) -> bool:
+ """Edit an existing Telegram message. Returns True on success, else False.
+
+ Used by the live tracker to refresh the single per-task message in place.
+ Never raises. A False return tells the caller to fall back to a new message
+ (e.g. the message is too old to edit / was deleted / 400).
+ """
+ s = _get_settings()
+ if not s.telegram_bot_token or not s.telegram_chat_id:
+ return False
+ try:
+ url = f"https://api.telegram.org/bot{s.telegram_bot_token}/editMessageText"
+ resp = httpx.post(
+ url,
+ json={
+ "chat_id": s.telegram_chat_id,
+ "message_id": message_id,
+ "text": text,
+ "parse_mode": "HTML",
+ },
+ timeout=5,
+ )
+ data = resp.json()
+ return bool(data.get("ok"))
+ except Exception:
+ return False
def _get_work_item_id(task_id: int) -> str:
@@ -50,26 +108,318 @@ def _get_work_item_id(task_id: int) -> str:
return f"task-{task_id}"
+# --------------------------------------------------------------------------- #
+# Live task tracker
+# --------------------------------------------------------------------------- #
+
+# Pipeline stages shown in the tracker, in order, with their display label and
+# the agent whose agent_runs rows describe that stage's work. "Ревью БРД" is NOT
+# an agent stage — it is the human approve gate rendered between Analysis and
+# Architecture from the task's brd_review_* timestamps.
+_TRACKER_STAGES = [
+ ("analysis", "Analysis", "analyst"),
+ ("architecture", "Architecture", "architect"),
+ ("development", "Development", "developer"),
+ ("review", "Review", "reviewer"),
+ ("testing", "Testing", "tester"),
+ ("deploy", "Deploy", "deployer"),
+]
+
+# Map a pipeline stage -> the agent that is RUNNING while the task sits in it.
+# (development is entered after architecture finishes, etc.) Used to render the
+# "🔄 … идёт" line for the currently-active stage.
+_BRD_LABEL = "\u0420\u0435\u0432\u044c\u044e \u0411\u0420\u0414" # "Ревью БРД"
+
+_STAGE_ACTIVE_AGENT = {
+ "analysis": "analyst",
+ "architecture": "architect",
+ "development": "developer",
+ "review": "reviewer",
+ "testing": "tester",
+ "deploy": "deployer",
+}
+
+
+def _fmt_minutes(seconds) -> str:
+ """Render a duration in whole minutes: 0..59s -> '<1м', else 'м'."""
+ try:
+ seconds = int(seconds or 0)
+ except (TypeError, ValueError):
+ seconds = 0
+ if seconds <= 0:
+ return "0м"
+ if seconds < 60:
+ return "<1м"
+ return f"{seconds // 60}\u043c"
+
+
+def _parse_sql_ts(ts):
+ """Parse a SQLite 'YYYY-MM-DD HH:MM:SS' UTC timestamp -> aware datetime/None."""
+ if not ts:
+ return None
+ from datetime import datetime, timezone
+ for fmt in ("%Y-%m-%d %H:%M:%S", "%Y-%m-%dT%H:%M:%S"):
+ try:
+ return datetime.strptime(str(ts)[:19], fmt).replace(tzinfo=timezone.utc)
+ except (ValueError, TypeError):
+ continue
+ return None
+
+
+def _duration_seconds(started, finished):
+ """Seconds between two SQL timestamps; None if either is missing/unparseable."""
+ a = _parse_sql_ts(started)
+ b = _parse_sql_ts(finished)
+ if a is None or b is None:
+ return None
+ return max(int((b - a).total_seconds()), 0)
+
+
+def render_task_tracker(task_id: int) -> str:
+ """Build the full live-tracker text for a task from the DB (stateless render).
+
+ Pulls the task header (work_item_id, title, stage), every agent_runs row, and
+ the BRD-review timestamps, then renders:
+ - one '✅ · ↓/↑ · · ' line per finished
+ stage (latest run per stage),
+ - the '⏸️ Ревью БРД · твоё время[ ⏳]' line between Analysis/Architecture,
+ - a '🔄 … идёт' line for the active (in-progress) stage,
+ - the '💰 ↓ / ↑ · ' totals,
+ - on done: '⏱️ Всего .. · агенты .. · твоё ..' and a '🔗 PR / 📦' line.
+
+ Never raises (returns a minimal fallback string on error).
+ """
+ from .db import get_db
+ from .usage import fmt_tokens, fmt_cost, _input_total, short_model_name
+
+ try:
+ conn = get_db()
+ task = conn.execute(
+ "SELECT id, work_item_id, title, stage, created_at, updated_at, "
+ "brd_review_started_at, brd_review_ended_at "
+ "FROM tasks WHERE id=?",
+ (task_id,),
+ ).fetchone()
+ if not task:
+ conn.close()
+ return f"task-{task_id}"
+ runs = conn.execute(
+ "SELECT agent, started_at, finished_at, exit_code, input_tokens, "
+ "output_tokens, cache_read_tokens, cache_creation_tokens, cost_usd, model "
+ "FROM agent_runs WHERE task_id=? ORDER BY id ASC",
+ (task_id,),
+ ).fetchall()
+ conn.close()
+ except Exception as e:
+ logger.warning(f"render_task_tracker({task_id}) DB error: {e}")
+ return f"task-{task_id}"
+
+ work_item_id = task["work_item_id"] or f"task-{task_id}"
+ title = task["title"] or work_item_id
+ stage = task["stage"] or "created"
+ done = stage == "done"
+
+ # Latest completed run per agent (a stage may have multiple runs on retry;
+ # we show the most recent FINISHED, successful run for the stage line).
+ last_done = {}
+ agent_runs_by_agent = {}
+ for r in runs:
+ agent_runs_by_agent.setdefault(r["agent"], []).append(r)
+ if r["finished_at"] and (r["exit_code"] == 0 or r["exit_code"] is None):
+ last_done[r["agent"]] = r
+
+ # Totals across ALL runs (every input/output token + cost counts).
+ total_in = 0
+ total_out = 0
+ total_cost = 0.0
+ agent_seconds = 0
+ for r in runs:
+ usage = {
+ "input_tokens": r["input_tokens"],
+ "cache_read_tokens": r["cache_read_tokens"],
+ "cache_creation_tokens": r["cache_creation_tokens"],
+ }
+ total_in += _input_total(usage)
+ total_out += int(r["output_tokens"] or 0)
+ total_cost += float(r["cost_usd"] or 0.0)
+ d = _duration_seconds(r["started_at"], r["finished_at"])
+ if d is not None:
+ agent_seconds += d
+
+ esc_title = html.escape(title)
+ header = (
+ f"\U0001f389 {html.escape(work_item_id)} \u00b7 {esc_title} \u2014 \u0413\u041e\u0422\u041e\u0412\u041e"
+ if done
+ else f"\U0001f6e0\ufe0f {html.escape(work_item_id)} \u00b7 {esc_title}"
+ )
+ bar = "\u2501" * 22
+ lines = [header, bar]
+
+ def _stage_line(label, run):
+ usage = {
+ "input_tokens": run["input_tokens"],
+ "cache_read_tokens": run["cache_read_tokens"],
+ "cache_creation_tokens": run["cache_creation_tokens"],
+ }
+ in_tok = fmt_tokens(_input_total(usage))
+ out_tok = fmt_tokens(run["output_tokens"])
+ cost = fmt_cost(run["cost_usd"])
+ dur = _fmt_minutes(_duration_seconds(run["started_at"], run["finished_at"]))
+ model = short_model_name(run["model"])
+ model_suffix = f" \u00b7 {model}" if model else ""
+ return (
+ f"\u2705 {label:<13} {dur} \u00b7 "
+ f"{in_tok}\u2193/{out_tok}\u2191 \u00b7 {cost}{model_suffix}"
+ )
+
+ # BRD review line: between Analysis and Architecture, only once Analysis has
+ # produced a run (i.e. the gate is live). Time = human review delta.
+ brd_started = task["brd_review_started_at"]
+ brd_ended = task["brd_review_ended_at"]
+ review_seconds = _duration_seconds(brd_started, brd_ended)
+
+ for stage_key, label, agent in _TRACKER_STAGES:
+ run = last_done.get(agent)
+ if run is not None:
+ lines.append(_stage_line(label, run))
+ elif _STAGE_ACTIVE_AGENT.get(stage) == agent and stage == stage_key:
+ # This stage is the active one and has no finished run yet.
+ lines.append(f"\U0001f504 {label:<13} \u2026 \u00b7 \u0438\u0434\u0451\u0442")
+ # else: not started yet -> not shown.
+
+ # Insert the BRD review line right after Analysis.
+ if stage_key == "analysis" and brd_started:
+ brd_label = f"{_BRD_LABEL:<13}"
+ if review_seconds is not None:
+ dur = _fmt_minutes(review_seconds)
+ lines.append(
+ f"\u23f8\ufe0f {brd_label} {dur} \u00b7 \u0442\u0432\u043e\u0451 \u0432\u0440\u0435\u043c\u044f"
+ )
+ else:
+ # Still waiting on the human (ended not stamped yet).
+ from datetime import datetime, timezone
+ start_dt = _parse_sql_ts(brd_started)
+ waited = None
+ if start_dt is not None:
+ waited = int(
+ (datetime.now(timezone.utc) - start_dt).total_seconds()
+ )
+ dur = _fmt_minutes(waited) if waited is not None else "\u2026"
+ lines.append(
+ f"\u23f8\ufe0f {brd_label} {dur} \u00b7 \u0442\u0432\u043e\u0451 \u0432\u0440\u0435\u043c\u044f \u23f3"
+ )
+
+ lines.append(bar)
+ lines.append(
+ f"\U0001f4b0 {fmt_tokens(total_in)}\u2193 / {fmt_tokens(total_out)}\u2191 \u00b7 "
+ f"{fmt_cost(total_cost)}"
+ )
+
+ if done:
+ wall = _duration_seconds(task["created_at"], task["updated_at"])
+ wall_str = _fmt_minutes(wall) if wall is not None else "?"
+ review_str = _fmt_minutes(review_seconds) if review_seconds else "0м"
+ lines.append(
+ f"\u23f1\ufe0f \u0412\u0441\u0435\u0433\u043e {wall_str} \u00b7 "
+ f"\u0430\u0433\u0435\u043d\u0442\u044b {_fmt_minutes(agent_seconds)} \u00b7 "
+ f"\u0442\u0432\u043e\u0451 {review_str}"
+ )
+ link = _done_link(task_id, task["work_item_id"])
+ if link:
+ lines.append(link)
+
+ return "\n".join(lines)
+
+
+def _done_link(task_id: int, work_item_id) -> str | None:
+ """Build the final '🔗 PR #n · 📦 deployed' line. Never raises -> None."""
+ try:
+ from .config import settings
+ from .db import get_db
+ conn = get_db()
+ row = conn.execute(
+ "SELECT repo, branch FROM tasks WHERE id=?", (task_id,)
+ ).fetchone()
+ conn.close()
+ if not row:
+ return None
+ repo, branch = row["repo"], row["branch"]
+ pr_part = None
+ try:
+ owner = settings.gitea_owner
+ headers = {"Authorization": f"token {settings.gitea_token}"}
+ resp = httpx.get(
+ f"{settings.gitea_url}/api/v1/repos/{owner}/{repo}/pulls",
+ params={"state": "all", "head": branch},
+ headers=headers, timeout=5,
+ )
+ if resp.status_code == 200:
+ prs = resp.json()
+ if prs:
+ pr_part = f"\U0001f517 PR #{prs[0].get('number')}"
+ except Exception:
+ pr_part = None
+ parts = []
+ if pr_part:
+ parts.append(pr_part)
+ parts.append("\U0001f4e6 deployed")
+ return " \u00b7 ".join(parts)
+ except Exception:
+ return None
+
+
+def update_task_tracker(task_id: int):
+ """Render + push the live tracker for a task. Never raises.
+
+ First call (no stored tracker_message_id): sendMessage (silent) and store the
+ returned message_id. Subsequent calls: editMessageText the stored message; if
+ the edit fails (too old / deleted / 400), fall back to a NEW message and
+ update the stored id. The tracker is always sent with disable_notification so
+ it never pings — only the dedicated alert helpers ping.
+ """
+ try:
+ from .db import get_tracker_message_id, set_tracker_message_id
+ text = render_task_tracker(task_id)
+ mid = get_tracker_message_id(task_id)
+ if mid is not None:
+ if edit_telegram(mid, text):
+ return
+ # Edit failed -> fall back to a fresh message.
+ new_mid = send_telegram(text, disable_notification=True)
+ if new_mid is not None:
+ set_tracker_message_id(task_id, new_mid)
+ except Exception as e:
+ logger.warning(f"update_task_tracker({task_id}) failed: {e}")
+
+
+# --------------------------------------------------------------------------- #
+# Stage / agent lifecycle notifications (now tracker-only, no separate message)
+# --------------------------------------------------------------------------- #
+
def notify_stage_change(task_id: int, old_stage: str, new_stage: str, agent: str = None):
- """Log and notify stage transition."""
+ """Log a stage transition and refresh the live tracker (no separate message)."""
work_item_id = _get_work_item_id(task_id)
msg = f"\U0001f504 {work_item_id}: {old_stage} \u2192 {new_stage}"
if agent:
msg += f" (\u0437\u0430\u043f\u0443\u0449\u0435\u043d {agent})"
logger.info(msg)
- send_telegram(msg)
+ update_task_tracker(task_id)
def notify_agent_started(run_id: int, agent: str, task_id: int):
- """Notify agent launch."""
+ """Log an agent launch and refresh the tracker (no separate message)."""
work_item_id = _get_work_item_id(task_id)
- msg = f"\U0001f680 {work_item_id}: {agent} \u0437\u0430\u043f\u0443\u0449\u0435\u043d (run_id={run_id})"
- logger.info(msg)
- send_telegram(msg)
+ logger.info(f"\U0001f680 {work_item_id}: {agent} \u0437\u0430\u043f\u0443\u0449\u0435\u043d (run_id={run_id})")
+ if task_id:
+ update_task_tracker(task_id)
def notify_agent_finished(run_id: int, agent: str, exit_code: int, task_id: int = None, duration_s: int = None):
- """Notify agent completion."""
+ """Log agent completion and refresh the tracker (no separate message).
+
+ The agent-FAILED alert (exit_code != 0) is still sent separately by the
+ launcher via send_telegram; this helper itself only logs + refreshes.
+ """
work_item_id = _get_work_item_id(task_id) if task_id else "?"
if exit_code == 0:
dur = f" ({duration_s // 60} \u043c\u0438\u043d)" if duration_s else ""
@@ -79,47 +429,66 @@ def notify_agent_finished(run_id: int, agent: str, exit_code: int, task_id: int
else:
msg = f"\u274c {work_item_id}: {agent} \u0443\u043f\u0430\u043b (exit_code={exit_code})"
logger.info(msg)
- send_telegram(msg)
+ if task_id:
+ update_task_tracker(task_id)
def notify_qg_result(task_id: int, check: str, passed: bool, reason: str = None):
- """Notify QG check result."""
+ """Log a QG check result (NO separate Telegram message: QG-pending is noise).
+
+ Kept for callers; QG outcomes are log-only now and reflected by the tracker
+ through the resulting stage transition.
+ """
work_item_id = _get_work_item_id(task_id)
if passed:
- msg = f"\u2705 {work_item_id}: QG {check} \u2014 passed"
+ logger.info(f"\u2705 {work_item_id}: QG {check} \u2014 passed")
else:
- msg = f"\u26a0\ufe0f {work_item_id}: QG {check} \u2014 failed: {reason}"
- logger.info(msg)
- send_telegram(msg)
+ logger.warning(f"\u26a0\ufe0f {work_item_id}: QG {check} \u2014 failed: {reason}")
def notify_qg_failure(task_id: int, stage: str, check: str, reason: str):
- """Log and notify QG check failure."""
+ """Log a QG check failure (log-only).
+
+ QG-pending / QG-failed are NOT pinged as separate messages anymore (they are
+ not actionable for Slava). Real rollbacks/deploy-fails are alerted by their
+ own dedicated send_telegram calls in the engine/launcher.
+ """
work_item_id = _get_work_item_id(task_id)
- msg = f"\u26a0\ufe0f {work_item_id}: QG {check} \u2014 failed: {reason}"
- logger.warning(msg)
- send_telegram(msg)
+ logger.warning(f"\u26a0\ufe0f {work_item_id}: QG {check} \u2014 failed: {reason}")
def notify_approve_requested(task_id: int):
- """Notify that analyst requests :approved:."""
+ """ALERT (separate, notifying): BRD/TZ/AC ready -> flip Plane to Approved.
+
+ Also starts the BRD-review clock and refreshes the tracker so the
+ '⏸️ Ревью БРД · твоё время ⏳' line appears.
+ """
work_item_id = _get_work_item_id(task_id)
- msg = f"\U0001f4cb {work_item_id}: BRD/\u0422\u0417/AC \u0433\u043e\u0442\u043e\u0432\u044b. \u0416\u0434\u0443 :approved: \u0432 Plane"
+ try:
+ from .db import mark_brd_review_started
+ mark_brd_review_started(task_id)
+ except Exception as e:
+ logger.warning(f"notify_approve_requested: brd clock start failed: {e}")
+ msg = (
+ f"\U0001f4cb {work_item_id}: BRD/\u0422\u0417/AC \u0433\u043e\u0442\u043e\u0432\u044b. "
+ f"\u041f\u0435\u0440\u0435\u0432\u0435\u0434\u0438\u0442\u0435 \u0437\u0430\u0434\u0430\u0447\u0443 \u0432 \u0441\u0442\u0430\u0442\u0443\u0441 Approved "
+ f"\u0432 Plane \u0434\u043b\u044f \u043f\u0440\u043e\u0434\u043e\u043b\u0436\u0435\u043d\u0438\u044f."
+ )
logger.info(msg)
- send_telegram(msg)
+ update_task_tracker(task_id)
+ send_telegram(msg) # separate, notifying
def notify_done(task_id: int):
- """Notify task completion."""
+ """Task completion: refresh the tracker to its final ГОТОВО form (no separate ping)."""
work_item_id = _get_work_item_id(task_id)
- msg = f"\U0001f389 {work_item_id}: \u0437\u0430\u0434\u0430\u0447\u0430 \u0437\u0430\u0432\u0435\u0440\u0448\u0435\u043d\u0430!"
- logger.info(msg)
- send_telegram(msg)
+ logger.info(f"\U0001f389 {work_item_id}: \u0437\u0430\u0434\u0430\u0447\u0430 \u0437\u0430\u0432\u0435\u0440\u0448\u0435\u043d\u0430!")
+ update_task_tracker(task_id)
def notify_error(task_id: int, error: str):
- """Log and notify error for a task."""
+ """ALERT (separate, notifying): task error."""
work_item_id = _get_work_item_id(task_id) if task_id else "system"
msg = f"\U0001f534 {work_item_id}: ERROR \u2014 {error}"
logger.error(msg)
- send_telegram(msg)
+ send_telegram(msg) # separate, notifying
diff --git a/src/stage_engine.py b/src/stage_engine.py
index 0fe4284..7d564b7 100644
--- a/src/stage_engine.py
+++ b/src/stage_engine.py
@@ -240,6 +240,15 @@ def advance_stage(
# --- Advance ---------------------------------------------------------
update_task_stage(task_id, next_stage)
+ # Telegram live tracker: the analysis->architecture advance is the human
+ # Approved gate clearing -> stamp the END of "Ревью БРД" (the only
+ # human time). Idempotent: only the first stamp counts.
+ if current_stage == "analysis" and next_stage == "architecture":
+ try:
+ from .db import mark_brd_review_ended
+ mark_brd_review_ended(task_id)
+ except Exception as e:
+ logger.warning(f"Task {task_id}: brd review end stamp failed: {e}")
notify_stage_change(task_id, current_stage, next_stage)
plane_notify_stage(work_item_id, current_stage, next_stage)
result.advanced = True
diff --git a/src/usage.py b/src/usage.py
index ce37ce0..96bd25a 100644
--- a/src/usage.py
+++ b/src/usage.py
@@ -79,9 +79,60 @@ def parse_usage_from_text(text: str) -> dict | None:
usage.get("cache_creation_input_tokens", usage.get("cache_creation_tokens"))
),
"cost_usd": _float(cost),
+ # Telegram live tracker: the model the run actually used. claude
+ # --output-format json reports it under modelUsage (a dict keyed by the
+ # full model id) and/or a top-level "model" field. We keep the FULL name
+ # here; short_model_name() trims it for the tracker. None when unknown.
+ "model": _extract_model(candidate),
}
+def _extract_model(candidate: dict) -> str | None:
+ """Best-effort: pull the model id out of a claude result JSON object.
+
+ Prefers modelUsage (a dict keyed by full model ids, e.g.
+ {"claude-opus-4-8": {...}}) and returns the key with the most output
+ tokens; falls back to a top-level "model" string. Never raises -> None.
+ """
+ try:
+ mu = candidate.get("modelUsage")
+ if isinstance(mu, dict) and mu:
+ def _out(v):
+ try:
+ return int((v or {}).get("outputTokens", 0))
+ except (TypeError, ValueError, AttributeError):
+ return 0
+ best = max(mu.items(), key=lambda kv: _out(kv[1]))
+ if best and best[0]:
+ return str(best[0])
+ model = candidate.get("model")
+ if isinstance(model, str) and model:
+ return model
+ except Exception:
+ pass
+ return None
+
+
+def short_model_name(full: str | None) -> str:
+ """Trim a full model id to a short tag for the tracker.
+
+ 'tokenator/claude-opus-4-8' -> 'opus-4-8'
+ 'vibecode/claude-sonnet-4.6' -> 'sonnet-4.6'
+ 'claude-opus-4-8' -> 'opus-4-8'
+ Returns '' when full is falsy so callers can omit the ' · ' suffix.
+ """
+ if not full:
+ return ""
+ name = str(full).strip()
+ # Drop any provider prefix up to and including the last '/'.
+ if "/" in name:
+ name = name.rsplit("/", 1)[-1]
+ # Drop a leading 'claude-' marketing prefix.
+ if name.startswith("claude-"):
+ name = name[len("claude-"):]
+ return name
+
+
def _extract_last_json_object(text: str) -> dict | None:
"""Return the last balanced top-level JSON object in `text` that parses.
@@ -157,13 +208,15 @@ def record_usage(run_id: int, usage: dict | None):
try:
conn.execute(
"UPDATE agent_runs SET input_tokens=?, output_tokens=?, "
- "cache_read_tokens=?, cache_creation_tokens=?, cost_usd=? WHERE id=?",
+ "cache_read_tokens=?, cache_creation_tokens=?, cost_usd=?, "
+ "model=COALESCE(?, model) WHERE id=?",
(
usage.get("input_tokens"),
usage.get("output_tokens"),
usage.get("cache_read_tokens"),
usage.get("cache_creation_tokens"),
usage.get("cost_usd"),
+ usage.get("model"),
run_id,
),
)
diff --git a/src/webhooks/plane.py b/src/webhooks/plane.py
index 1eb0f6b..cdcc2e7 100644
--- a/src/webhooks/plane.py
+++ b/src/webhooks/plane.py
@@ -494,8 +494,9 @@ async def start_pipeline(data: dict, project_id: str = ""):
# Insert task into DB
conn = get_db()
conn.execute(
- "INSERT INTO tasks (plane_id, work_item_id, repo, branch, stage, plane_issue_id) VALUES (?, ?, ?, ?, ?, ?)",
- (plane_id, work_item_id, repo, branch, "analysis", plane_id),
+ "INSERT INTO tasks (plane_id, work_item_id, repo, branch, stage, plane_issue_id, title) "
+ "VALUES (?, ?, ?, ?, ?, ?, ?)",
+ (plane_id, work_item_id, repo, branch, "analysis", plane_id, name),
)
conn.commit()
conn.close()
diff --git a/tests/test_telegram_tracker.py b/tests/test_telegram_tracker.py
new file mode 100644
index 0000000..d563fa3
--- /dev/null
+++ b/tests/test_telegram_tracker.py
@@ -0,0 +1,342 @@
+"""feat/telegram-live-tracker: tests for the live Telegram task tracker.
+
+Covers (per DEV_TASK_TELEGRAM_TRACKER.md):
+ * short_model_name: provider/claude- prefix trimming.
+ * render_task_tracker: per-stage line format (in↓/out↑, model, cost, minutes),
+ the "⏸️ Ревью БРД · твоё время" line, the 💰 totals, and the finish block
+ (⏱️ three times + 🔗/📦).
+ * first message -> sendMessage stores message_id; transition -> editMessageText.
+ * fallback: editMessageText fails -> a NEW message is sent and the id updated.
+ * which alerts go out SEPARATELY (approve-gate / deploy-fail / agent-fail /
+ error) vs which do NOT (QG-pending / agent-start / stage-transition).
+
+Isolated temp DB; no network (httpx is patched).
+"""
+
+import os
+import tempfile
+
+os.environ.setdefault("ORCH_PLANE_API_TOKEN", "test-token")
+os.environ.setdefault("ORCH_GITEA_TOKEN", "test-token")
+
+_test_db = os.path.join(tempfile.gettempdir(), "test_orchestrator_tracker.db")
+os.environ["ORCH_DB_PATH"] = _test_db
+
+from unittest.mock import MagicMock, patch # noqa: E402
+
+import pytest # noqa: E402
+
+import src.db as db_module # noqa: E402
+from src.db import init_db, get_db # noqa: E402
+from src import notifications as N # noqa: E402
+from src import usage as U # noqa: E402
+
+
+@pytest.fixture(autouse=True)
+def setup_db(monkeypatch):
+ monkeypatch.setattr(db_module.settings, "db_path", _test_db, raising=False)
+ if os.path.exists(_test_db):
+ os.unlink(_test_db)
+ init_db()
+ # Re-enable send_telegram (conftest stubs it to a no-op); these tests patch
+ # httpx / the lower-level helpers explicitly per case.
+ yield
+ if os.path.exists(_test_db):
+ os.unlink(_test_db)
+
+
+# --------------------------------------------------------------------------- #
+# helpers to build a task + runs in the DB
+# --------------------------------------------------------------------------- #
+def _mk_task(stage="development", title="\u0422\u0440\u0435\u043a\u0438 \u0441 \u0437\u0443\u043c\u0430 z5",
+ wid="ET-012", brd_start=None, brd_end=None):
+ conn = get_db()
+ cur = conn.execute(
+ "INSERT INTO tasks (plane_id, work_item_id, repo, branch, stage, title, "
+ "brd_review_started_at, brd_review_ended_at) "
+ "VALUES (?, ?, ?, ?, ?, ?, ?, ?)",
+ ("p1", wid, "enduro-trails", "feature/ET-012-x", stage, title,
+ brd_start, brd_end),
+ )
+ tid = cur.lastrowid
+ conn.commit()
+ conn.close()
+ return tid
+
+
+def _mk_run(task_id, agent, started, finished, in_tok, out_tok,
+ cache_read=0, cache_creation=0, cost=0.0, model=None, exit_code=0):
+ conn = get_db()
+ cur = conn.execute(
+ "INSERT INTO agent_runs (task_id, agent, started_at, finished_at, "
+ "exit_code, input_tokens, output_tokens, cache_read_tokens, "
+ "cache_creation_tokens, cost_usd, model) "
+ "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
+ (task_id, agent, started, finished, exit_code, in_tok, out_tok,
+ cache_read, cache_creation, cost, model),
+ )
+ rid = cur.lastrowid
+ conn.commit()
+ conn.close()
+ return rid
+
+
+# --------------------------------------------------------------------------- #
+# short_model_name
+# --------------------------------------------------------------------------- #
+def test_short_model_name():
+ assert U.short_model_name("tokenator/claude-opus-4-8") == "opus-4-8"
+ assert U.short_model_name("vibecode/claude-sonnet-4.6") == "sonnet-4.6"
+ assert U.short_model_name("claude-opus-4-8") == "opus-4-8"
+ assert U.short_model_name("opus-4-8") == "opus-4-8"
+ assert U.short_model_name(None) == ""
+ assert U.short_model_name("") == ""
+
+
+def test_parse_usage_extracts_model_from_modelusage():
+ blob = (
+ '{"total_cost_usd":0.01,'
+ '"usage":{"input_tokens":10,"output_tokens":5},'
+ '"modelUsage":{"claude-opus-4-8":{"inputTokens":10,"outputTokens":5}}}'
+ )
+ u = U.parse_usage_from_text(blob)
+ assert u["model"] == "claude-opus-4-8"
+
+
+# --------------------------------------------------------------------------- #
+# render_task_tracker
+# --------------------------------------------------------------------------- #
+def test_render_in_progress_stage_lines_and_totals():
+ tid = _mk_task(stage="deploy", brd_start="2026-06-04 10:00:00",
+ brd_end="2026-06-04 10:08:00")
+ # Analysis: 10м, 1.1M in (mostly cache) / 39.6k out, $2.38, opus-4-8
+ _mk_run(tid, "analyst", "2026-06-04 09:00:00", "2026-06-04 09:10:00",
+ in_tok=1000, out_tok=39600, cache_read=1_100_000, cost=2.38,
+ model="tokenator/claude-opus-4-8")
+ _mk_run(tid, "architect", "2026-06-04 10:08:00", "2026-06-04 10:17:00",
+ in_tok=500, out_tok=34400, cache_read=1_500_000, cost=2.24,
+ model="tokenator/claude-opus-4-8")
+ _mk_run(tid, "developer", "2026-06-04 10:17:00", "2026-06-04 10:28:00",
+ in_tok=400, out_tok=45800, cache_read=8_400_000, cost=7.29,
+ model="tokenator/claude-opus-4-8")
+ _mk_run(tid, "reviewer", "2026-06-04 10:28:00", "2026-06-04 10:31:00",
+ in_tok=300, out_tok=12900, cache_read=1_200_000, cost=1.53,
+ model="vibecode/claude-sonnet-4.6")
+ _mk_run(tid, "tester", "2026-06-04 10:31:00", "2026-06-04 10:36:00",
+ in_tok=200, out_tok=19500, cache_read=1_200_000, cost=1.51,
+ model="vibecode/claude-sonnet-4.6")
+ # deployer started but not finished -> active "идёт" line.
+ _mk_run(tid, "deployer", "2026-06-04 10:36:00", None,
+ in_tok=0, out_tok=0, model=None, exit_code=None)
+
+ text = N.render_task_tracker(tid)
+
+ # Header in-progress
+ assert text.startswith("\U0001f6e0\ufe0f ET-012 \u00b7 \u0422\u0440\u0435\u043a\u0438")
+ # Per-stage format: in↓/out↑ · cost · model
+ assert "\u2705 Analysis" in text
+ assert "10\u043c" in text # analysis duration
+ assert "39.6k\u2191" in text # analysis out
+ assert "$2.38" in text
+ assert "opus-4-8" in text
+ assert "sonnet-4.6" in text # reviewer/tester model
+ # BRD review line (human time, ended)
+ assert "\u0420\u0435\u0432\u044c\u044e \u0411\u0420\u0414" in text
+ assert "\u0442\u0432\u043e\u0451 \u0432\u0440\u0435\u043c\u044f" in text
+ # Active stage
+ assert "\U0001f504 Deploy" in text
+ assert "\u0438\u0434\u0451\u0442" in text
+ # Totals line present with 💰
+ assert "\U0001f4b0" in text
+ # In-progress: no final ⏱️ line
+ assert "\u0412\u0441\u0435\u0433\u043e" not in text
+
+
+def test_render_brd_review_waiting_shows_hourglass():
+ tid = _mk_task(stage="analysis", brd_start="2026-06-04 10:00:00",
+ brd_end=None)
+ _mk_run(tid, "analyst", "2026-06-04 09:00:00", "2026-06-04 09:10:00",
+ in_tok=1000, out_tok=39600, cache_read=1_100_000, cost=2.38,
+ model="tokenator/claude-opus-4-8")
+ text = N.render_task_tracker(tid)
+ assert "\u0420\u0435\u0432\u044c\u044e \u0411\u0420\u0414" in text
+ assert "\u23f3" in text # hourglass while waiting
+
+
+def test_render_done_has_times_and_links():
+ tid = _mk_task(stage="done", brd_start="2026-06-04 10:00:00",
+ brd_end="2026-06-04 10:08:00")
+ # set created/updated to compute wall clock
+ conn = get_db()
+ conn.execute(
+ "UPDATE tasks SET created_at='2026-06-04 09:00:00', "
+ "updated_at='2026-06-04 09:56:00' WHERE id=?", (tid,))
+ conn.commit()
+ conn.close()
+ _mk_run(tid, "analyst", "2026-06-04 09:00:00", "2026-06-04 09:10:00",
+ in_tok=1000, out_tok=39600, cache_read=1_100_000, cost=2.38,
+ model="tokenator/claude-opus-4-8")
+ _mk_run(tid, "deployer", "2026-06-04 09:50:00", "2026-06-04 09:56:00",
+ in_tok=400, out_tok=22400, cache_read=1_600_000, cost=1.73,
+ model="tokenator/claude-opus-4-8")
+
+ with patch("src.notifications.httpx") as _hx:
+ # No PR found -> just "📦 deployed"
+ _resp = MagicMock(status_code=200)
+ _resp.json.return_value = []
+ _hx.get.return_value = _resp
+ text = N.render_task_tracker(tid)
+
+ assert text.startswith("\U0001f389 ET-012")
+ assert "\u0413\u041e\u0422\u041e\u0412\u041e" in text
+ # ⏱️ with three times
+ assert "\u23f1\ufe0f" in text
+ assert "\u0412\u0441\u0435\u0433\u043e" in text
+ assert "\u0430\u0433\u0435\u043d\u0442\u044b" in text
+ assert "\u0442\u0432\u043e\u0451" in text
+ # 📦 deployed line
+ assert "\U0001f4e6" in text
+
+
+def test_render_escapes_html_in_title():
+ tid = _mk_task(stage="analysis", title="A & B")
+ _mk_run(tid, "analyst", "2026-06-04 09:00:00", "2026-06-04 09:10:00",
+ in_tok=10, out_tok=5, cost=0.0)
+ text = N.render_task_tracker(tid)
+ assert "<b>" in text
+ assert "&" in text
+
+
+def test_render_omits_model_when_unknown():
+ tid = _mk_task(stage="analysis")
+ _mk_run(tid, "analyst", "2026-06-04 09:00:00", "2026-06-04 09:10:00",
+ in_tok=10, out_tok=5, cost=0.0, model=None)
+ text = N.render_task_tracker(tid)
+ # No trailing " · " — line ends at cost.
+ line = [l for l in text.splitlines() if l.startswith("\u2705 Analysis")][0]
+ assert line.rstrip().endswith("$0.00")
+
+
+# --------------------------------------------------------------------------- #
+# tracker send / edit / fallback
+# --------------------------------------------------------------------------- #
+def test_first_call_sends_message_and_stores_id(monkeypatch):
+ tid = _mk_task(stage="analysis")
+ _mk_run(tid, "analyst", "2026-06-04 09:00:00", None, in_tok=0, out_tok=0,
+ exit_code=None)
+
+ sent = {}
+ def _fake_send(text, disable_notification=False):
+ sent["text"] = text
+ sent["silent"] = disable_notification
+ return 555
+ monkeypatch.setattr(N, "send_telegram", _fake_send)
+ monkeypatch.setattr(N, "edit_telegram", lambda *a, **k: (_ for _ in ()).throw(AssertionError("should not edit on first call")))
+
+ N.update_task_tracker(tid)
+
+ from src.db import get_tracker_message_id
+ assert get_tracker_message_id(tid) == 555
+ assert sent["silent"] is True # tracker is silent
+
+
+def test_second_call_edits_existing_message(monkeypatch):
+ tid = _mk_task(stage="development")
+ _mk_run(tid, "analyst", "2026-06-04 09:00:00", "2026-06-04 09:10:00",
+ in_tok=10, out_tok=5, cost=0.1)
+ from src.db import set_tracker_message_id
+ set_tracker_message_id(tid, 777)
+
+ edited = {}
+ monkeypatch.setattr(N, "edit_telegram",
+ lambda mid, text: edited.update(mid=mid) or True)
+ monkeypatch.setattr(N, "send_telegram",
+ lambda *a, **k: (_ for _ in ()).throw(AssertionError("should not send when edit succeeds")))
+
+ N.update_task_tracker(tid)
+ assert edited["mid"] == 777
+
+
+def test_fallback_to_new_message_when_edit_fails(monkeypatch):
+ tid = _mk_task(stage="development")
+ _mk_run(tid, "analyst", "2026-06-04 09:00:00", "2026-06-04 09:10:00",
+ in_tok=10, out_tok=5, cost=0.1)
+ from src.db import set_tracker_message_id, get_tracker_message_id
+ set_tracker_message_id(tid, 100)
+
+ monkeypatch.setattr(N, "edit_telegram", lambda mid, text: False) # edit fails
+ monkeypatch.setattr(N, "send_telegram", lambda text, disable_notification=False: 200)
+
+ N.update_task_tracker(tid)
+ assert get_tracker_message_id(tid) == 200 # id updated to the new message
+
+
+# --------------------------------------------------------------------------- #
+# which alerts are SEPARATE vs tracker-only
+# --------------------------------------------------------------------------- #
+def test_approve_gate_sends_separate_message_and_starts_brd_clock(monkeypatch):
+ tid = _mk_task(stage="analysis")
+ calls = []
+ monkeypatch.setattr(N, "send_telegram",
+ lambda text, disable_notification=False: calls.append((text, disable_notification)) or 1)
+ monkeypatch.setattr(N, "update_task_tracker", lambda task_id: None)
+
+ N.notify_approve_requested(tid)
+
+ # exactly one SEPARATE (notifying) send for the approve gate
+ assert len(calls) == 1
+ assert calls[0][1] is False # notifying
+ assert "Approved" in calls[0][0]
+ # BRD clock started
+ conn = get_db()
+ row = conn.execute("SELECT brd_review_started_at FROM tasks WHERE id=?", (tid,)).fetchone()
+ conn.close()
+ assert row[0] is not None
+
+
+def test_error_sends_separate_message(monkeypatch):
+ tid = _mk_task(stage="development")
+ calls = []
+ monkeypatch.setattr(N, "send_telegram",
+ lambda text, disable_notification=False: calls.append((text, disable_notification)) or 1)
+ N.notify_error(tid, "boom")
+ assert len(calls) == 1
+ assert calls[0][1] is False # notifying
+ assert "ERROR" in calls[0][0]
+
+
+def test_stage_change_does_not_send_separate_message(monkeypatch):
+ tid = _mk_task(stage="development")
+ sent = []
+ monkeypatch.setattr(N, "send_telegram",
+ lambda text, disable_notification=False: sent.append(text) or 1)
+ # tracker refresh is allowed (edit/send silent) but must NOT use send_telegram
+ # for a separate notification; stub update to isolate.
+ refreshed = []
+ monkeypatch.setattr(N, "update_task_tracker", lambda task_id: refreshed.append(task_id))
+
+ N.notify_stage_change(tid, "development", "review")
+ assert sent == [] # no separate message
+ assert refreshed == [tid] # tracker refreshed instead
+
+
+def test_agent_started_does_not_send_separate_message(monkeypatch):
+ tid = _mk_task(stage="analysis")
+ sent = []
+ monkeypatch.setattr(N, "send_telegram",
+ lambda text, disable_notification=False: sent.append(text) or 1)
+ refreshed = []
+ monkeypatch.setattr(N, "update_task_tracker", lambda task_id: refreshed.append(task_id))
+
+ N.notify_agent_started(1, "analyst", tid)
+ assert sent == []
+ assert refreshed == [tid]
+
+
+def test_qg_failure_does_not_send_separate_message(monkeypatch):
+ tid = _mk_task(stage="development")
+ sent = []
+ monkeypatch.setattr(N, "send_telegram",
+ lambda text, disable_notification=False: sent.append(text) or 1)
+ N.notify_qg_failure(tid, "development", "check_ci_green", "CI state: pending")
+ assert sent == [] # QG-pending is log-only, never a separate ping