"""Notifications and logging for orchestrator events. feat/telegram-live-tracker (Variant B+): instead of ~15 separate Telegram messages per task (agent start / finish / stage transition / QG-pending / tech noise), the orchestrator now maintains ONE live tracker message per task that is edited in place (editMessageText) on every stage transition. Only events that NEED Slava's attention are sent as SEPARATE, notifying messages: * approve-gate (notify_approve_requested) — BRD/TZ/AC ready, flip to Approved * deploy failed / rolled back — send_telegram from launcher/engine * agent failed (exit_code != 0) — send_telegram from launcher * task error (notify_error) The tracker itself is edited SILENTLY (disable_notification: true). Stage-change, agent-start, agent-finish and QG-pending no longer emit their own messages — they just refresh the tracker (or are log-only). """ import html import logging import httpx logger = logging.getLogger("orchestrator") # Lazy import to avoid circular imports at module level _settings = None def _get_settings(): global _settings if _settings is None: from .config import settings _settings = settings return _settings # --------------------------------------------------------------------------- # # Low-level Telegram primitives # --------------------------------------------------------------------------- # def send_telegram(text: str, disable_notification: bool = False): """Send a notification to Telegram. Fire-and-forget, never raises. Returns the Telegram message_id on success, else None (so callers that want to track the message — the tracker — can store it; legacy callers ignore it). """ s = _get_settings() if not s.telegram_bot_token or not s.telegram_chat_id: return None try: url = f"https://api.telegram.org/bot{s.telegram_bot_token}/sendMessage" resp = httpx.post( url, json={ "chat_id": s.telegram_chat_id, "text": text, "parse_mode": "HTML", "disable_notification": disable_notification, }, timeout=5, ) data = resp.json() if data.get("ok"): return data["result"]["message_id"] except Exception: pass # Never crash orchestrator due to notification failure return None # edit_telegram outcome codes -> let update_task_tracker decide what to do: # "ok" edit applied -> nothing else to do # "not_modified" Telegram says text is identical (400 "message is not # modified" / "exactly the same") -> success, NO new message # "gone" original message can't be edited (deleted / too old / # invalid id) -> caller must fall back to a NEW message # "failed" transient failure (network / timeout / 5xx / unknown 400) # -> caller must NOT send a new message (avoid duplicates) EDIT_OK = "ok" EDIT_NOT_MODIFIED = "not_modified" EDIT_GONE = "gone" EDIT_FAILED = "failed" # Telegram error descriptions that mean the message is permanently un-editable # (it is gone / orphaned) -> fall back to a fresh message. _GONE_MARKERS = ( "message to edit not found", "message can't be edited", "message_id_invalid", ) # Telegram "nothing changed" -> treat as success, never a duplicate. _NOT_MODIFIED_MARKERS = ( "message is not modified", "exactly the same", ) def edit_telegram(message_id: int, text: str) -> str: """Edit an existing Telegram message. Never raises. Returns a distinguishable outcome (see EDIT_* constants) so the caller can tell apart "all good" / "nothing changed" / "message gone" / "transient failure" and only fall back to a NEW message when the original is truly gone. """ s = _get_settings() if not s.telegram_bot_token or not s.telegram_chat_id: return EDIT_FAILED try: url = f"https://api.telegram.org/bot{s.telegram_bot_token}/editMessageText" resp = httpx.post( url, json={ "chat_id": s.telegram_chat_id, "message_id": message_id, "text": text, "parse_mode": "HTML", }, timeout=5, ) data = resp.json() if data.get("ok"): return EDIT_OK # ok:false -> inspect the description to classify the 400. desc = str(data.get("description") or "").lower() if any(m in desc for m in _NOT_MODIFIED_MARKERS): # Text is identical between transitions (e.g. repeat review cycle # renders the same line). Nothing to do, NOT a duplicate. logger.debug( f"edit_telegram(mid={message_id}): not modified, skipping" ) return EDIT_NOT_MODIFIED if any(m in desc for m in _GONE_MARKERS): logger.warning( f"edit_telegram(mid={message_id}): message gone ({desc!r}), " f"will fall back to a new message" ) return EDIT_GONE # Unknown 400 / other non-ok -> transient/unknown, do NOT duplicate. logger.warning( f"edit_telegram(mid={message_id}): edit failed ({desc!r})" ) return EDIT_FAILED except Exception as e: # Network / timeout / 5xx -> transient, do NOT duplicate. logger.warning(f"edit_telegram(mid={message_id}): transient error: {e}") return EDIT_FAILED def _get_work_item_id(task_id: int) -> str: """Get work_item_id from DB by task_id.""" try: from .db import get_db conn = get_db() row = conn.execute("SELECT work_item_id FROM tasks WHERE id=?", (task_id,)).fetchone() conn.close() return row[0] if row and row[0] else f"task-{task_id}" except Exception: return f"task-{task_id}" # --------------------------------------------------------------------------- # # Live task tracker # --------------------------------------------------------------------------- # # Pipeline stages shown in the tracker, in order, with their display label and # the agent whose agent_runs rows describe that stage's work. "Ревью БРД" is NOT # an agent stage — it is the human approve gate rendered between Analysis and # Architecture from the task's brd_review_* timestamps. _TRACKER_STAGES = [ ("analysis", "Analysis", "analyst"), ("architecture", "Architecture", "architect"), ("development", "Development", "developer"), ("review", "Review", "reviewer"), ("testing", "Testing", "tester"), ("deploy", "Deploy", "deployer"), ] # Map a pipeline stage -> the agent that is RUNNING while the task sits in it. # (development is entered after architecture finishes, etc.) Used to render the # "🔄 … идёт" line for the currently-active stage. _BRD_LABEL = "\u0420\u0435\u0432\u044c\u044e \u0411\u0420\u0414" # "Ревью БРД" _STAGE_ACTIVE_AGENT = { "analysis": "analyst", "architecture": "architect", "development": "developer", "review": "reviewer", "testing": "tester", "deploy": "deployer", } def _fmt_minutes(seconds) -> str: """Render a duration in whole minutes: 0..59s -> '<1м', else 'м'.""" try: seconds = int(seconds or 0) except (TypeError, ValueError): seconds = 0 if seconds <= 0: return "0м" if seconds < 60: return "<1м" return f"{seconds // 60}\u043c" def _parse_sql_ts(ts): """Parse a SQLite 'YYYY-MM-DD HH:MM:SS' UTC timestamp -> aware datetime/None.""" if not ts: return None from datetime import datetime, timezone for fmt in ("%Y-%m-%d %H:%M:%S", "%Y-%m-%dT%H:%M:%S"): try: return datetime.strptime(str(ts)[:19], fmt).replace(tzinfo=timezone.utc) except (ValueError, TypeError): continue return None def _duration_seconds(started, finished): """Seconds between two SQL timestamps; None if either is missing/unparseable.""" a = _parse_sql_ts(started) b = _parse_sql_ts(finished) if a is None or b is None: return None return max(int((b - a).total_seconds()), 0) def render_task_tracker(task_id: int) -> str: """Build the full live-tracker text for a task from the DB (stateless render). Pulls the task header (work_item_id, title, stage), every agent_runs row, and the BRD-review timestamps, then renders: - one '✅ · ↓/↑ · · ' line per finished stage (latest run per stage), - the '⏸️ Ревью БРД · твоё время[ ⏳]' line between Analysis/Architecture, - a '🔄 … идёт' line for the active (in-progress) stage, - the '💰 ↓ / ↑ · ' totals, - on done: '⏱️ Всего .. · агенты .. · твоё ..' and a '🔗 PR / 📦' line. Never raises (returns a minimal fallback string on error). """ from .db import get_db from .usage import fmt_tokens, fmt_cost, _input_total, short_model_name try: conn = get_db() task = conn.execute( "SELECT id, work_item_id, title, stage, created_at, updated_at, " "brd_review_started_at, brd_review_ended_at " "FROM tasks WHERE id=?", (task_id,), ).fetchone() if not task: conn.close() return f"task-{task_id}" runs = conn.execute( "SELECT agent, started_at, finished_at, exit_code, input_tokens, " "output_tokens, cache_read_tokens, cache_creation_tokens, cost_usd, model " "FROM agent_runs WHERE task_id=? ORDER BY id ASC", (task_id,), ).fetchall() conn.close() except Exception as e: logger.warning(f"render_task_tracker({task_id}) DB error: {e}") return f"task-{task_id}" work_item_id = task["work_item_id"] or f"task-{task_id}" title = task["title"] or work_item_id stage = task["stage"] or "created" done = stage == "done" # Latest completed run per agent (a stage may have multiple runs on retry; # we show the most recent FINISHED, successful run for the stage line). last_done = {} agent_runs_by_agent = {} for r in runs: agent_runs_by_agent.setdefault(r["agent"], []).append(r) if r["finished_at"] and (r["exit_code"] == 0 or r["exit_code"] is None): last_done[r["agent"]] = r # Totals across ALL runs (every input/output token + cost counts). total_in = 0 total_out = 0 total_cost = 0.0 agent_seconds = 0 for r in runs: usage = { "input_tokens": r["input_tokens"], "cache_read_tokens": r["cache_read_tokens"], "cache_creation_tokens": r["cache_creation_tokens"], } total_in += _input_total(usage) total_out += int(r["output_tokens"] or 0) total_cost += float(r["cost_usd"] or 0.0) d = _duration_seconds(r["started_at"], r["finished_at"]) if d is not None: agent_seconds += d esc_title = html.escape(title) header = ( f"\U0001f389 {html.escape(work_item_id)} \u00b7 {esc_title} \u2014 \u0413\u041e\u0422\u041e\u0412\u041e" if done else f"\U0001f6e0\ufe0f {html.escape(work_item_id)} \u00b7 {esc_title}" ) bar = "\u2501" * 22 lines = [header, bar] def _stage_line(label, run): usage = { "input_tokens": run["input_tokens"], "cache_read_tokens": run["cache_read_tokens"], "cache_creation_tokens": run["cache_creation_tokens"], } in_tok = fmt_tokens(_input_total(usage)) out_tok = fmt_tokens(run["output_tokens"]) cost = fmt_cost(run["cost_usd"]) dur = _fmt_minutes(_duration_seconds(run["started_at"], run["finished_at"])) model = short_model_name(run["model"]) model_suffix = f" \u00b7 {model}" if model else "" return ( f"\u2705 {label:<13} {dur} \u00b7 " f"{in_tok}\u2193/{out_tok}\u2191 \u00b7 {cost}{model_suffix}" ) # BRD review line: between Analysis and Architecture, only once Analysis has # produced a run (i.e. the gate is live). Time = human review delta. brd_started = task["brd_review_started_at"] brd_ended = task["brd_review_ended_at"] review_seconds = _duration_seconds(brd_started, brd_ended) for stage_key, label, agent in _TRACKER_STAGES: run = last_done.get(agent) # The stage is "in progress" only when it is the task's current stage AND # there is an unfinished run for its agent (the agent is actually still # working). A finished run with no in-flight run -> show the \u2705 result, # even if the task still sits in that stage (just-finished snapshot). agent_runs = agent_runs_by_agent.get(agent, []) has_inflight = any(ar["finished_at"] is None for ar in agent_runs) is_active_stage = ( _STAGE_ACTIVE_AGENT.get(stage) == agent and stage == stage_key and (has_inflight or run is None) ) if is_active_stage: # Live "\U0001f504 ... \u0438\u0434\u0451\u0442" line. Count how many times THIS stage's # agent has run for this task; a 2nd+ run means we're re-doing the # stage (e.g. review->development->review), so show "\u043f\u043e\u043f\u044b\u0442\u043a\u0430 N" # to make the text change between cycles and to honestly show Slava # the stage is being re-worked. attempt = len(agent_runs) if attempt >= 2: lines.append( f"\U0001f504 {label} \u00b7 \u043f\u043e\u043f\u044b\u0442\u043a\u0430 {attempt} " f"\u2026 \u0438\u0434\u0451\u0442" ) else: lines.append( f"\U0001f504 {label:<13} \u2026 \u00b7 \u0438\u0434\u0451\u0442" ) elif run is not None: lines.append(_stage_line(label, run)) # else: not started yet -> not shown. # Insert the BRD review line right after Analysis. if stage_key == "analysis" and brd_started: brd_label = f"{_BRD_LABEL:<13}" if review_seconds is not None: dur = _fmt_minutes(review_seconds) lines.append( f"\u23f8\ufe0f {brd_label} {dur} \u00b7 \u0442\u0432\u043e\u0451 \u0432\u0440\u0435\u043c\u044f" ) else: # Still waiting on the human (ended not stamped yet). from datetime import datetime, timezone start_dt = _parse_sql_ts(brd_started) waited = None if start_dt is not None: waited = int( (datetime.now(timezone.utc) - start_dt).total_seconds() ) dur = _fmt_minutes(waited) if waited is not None else "\u2026" lines.append( f"\u23f8\ufe0f {brd_label} {dur} \u00b7 \u0442\u0432\u043e\u0451 \u0432\u0440\u0435\u043c\u044f \u23f3" ) lines.append(bar) lines.append( f"\U0001f4b0 {fmt_tokens(total_in)}\u2193 / {fmt_tokens(total_out)}\u2191 \u00b7 " f"{fmt_cost(total_cost)}" ) if done: wall = _duration_seconds(task["created_at"], task["updated_at"]) wall_str = _fmt_minutes(wall) if wall is not None else "?" review_str = _fmt_minutes(review_seconds) if review_seconds else "0м" lines.append( f"\u23f1\ufe0f \u0412\u0441\u0435\u0433\u043e {wall_str} \u00b7 " f"\u0430\u0433\u0435\u043d\u0442\u044b {_fmt_minutes(agent_seconds)} \u00b7 " f"\u0442\u0432\u043e\u0451 {review_str}" ) link = _done_link(task_id, task["work_item_id"]) if link: lines.append(link) return "\n".join(lines) def _done_link(task_id: int, work_item_id) -> str | None: """Build the final '🔗 PR #n · 📦 deployed' line. Never raises -> None.""" try: from .config import settings from .db import get_db conn = get_db() row = conn.execute( "SELECT repo, branch FROM tasks WHERE id=?", (task_id,) ).fetchone() conn.close() if not row: return None repo, branch = row["repo"], row["branch"] pr_part = None try: owner = settings.gitea_owner headers = {"Authorization": f"token {settings.gitea_token}"} resp = httpx.get( f"{settings.gitea_url}/api/v1/repos/{owner}/{repo}/pulls", params={"state": "all", "head": branch}, headers=headers, timeout=5, ) if resp.status_code == 200: prs = resp.json() if prs: pr_part = f"\U0001f517 PR #{prs[0].get('number')}" except Exception: pr_part = None parts = [] if pr_part: parts.append(pr_part) parts.append("\U0001f4e6 deployed") return " \u00b7 ".join(parts) except Exception: return None def update_task_tracker(task_id: int): """Render + push the live tracker for a task. Never raises. First call (no stored tracker_message_id): sendMessage (silent) and store the returned message_id. Subsequent calls: editMessageText the stored message. A NEW message is sent ONLY when the original is truly gone (deleted / too old / invalid id). On "not modified" (text unchanged) or transient failures (network / timeout / 5xx / unknown 400) we do NOT send a new message — that is exactly what produced duplicate trackers and orphaned (lagging) messages. The tracker is always sent with disable_notification so it never pings — only the dedicated alert helpers ping. """ try: from .db import get_tracker_message_id, set_tracker_message_id text = render_task_tracker(task_id) mid = get_tracker_message_id(task_id) if mid is not None: result = edit_telegram(mid, text) if result in (EDIT_OK, EDIT_NOT_MODIFIED): # Edited in place (or nothing to change) -> done, no duplicate. return if result == EDIT_FAILED: # Transient -> don't duplicate; tracker redraws next transition. logger.debug( f"update_task_tracker({task_id}): edit failed transiently, " f"keeping message {mid}" ) return # result == EDIT_GONE -> the stored message is gone; fall through # to send a fresh one and re-point tracker_message_id at it. new_mid = send_telegram(text, disable_notification=True) if new_mid is not None: set_tracker_message_id(task_id, new_mid) except Exception as e: logger.warning(f"update_task_tracker({task_id}) failed: {e}") # --------------------------------------------------------------------------- # # Stage / agent lifecycle notifications (now tracker-only, no separate message) # --------------------------------------------------------------------------- # def notify_stage_change(task_id: int, old_stage: str, new_stage: str, agent: str = None): """Log a stage transition and refresh the live tracker (no separate message).""" work_item_id = _get_work_item_id(task_id) msg = f"\U0001f504 {work_item_id}: {old_stage} \u2192 {new_stage}" if agent: msg += f" (\u0437\u0430\u043f\u0443\u0449\u0435\u043d {agent})" logger.info(msg) update_task_tracker(task_id) def notify_agent_started(run_id: int, agent: str, task_id: int): """Log an agent launch and refresh the tracker (no separate message).""" work_item_id = _get_work_item_id(task_id) logger.info(f"\U0001f680 {work_item_id}: {agent} \u0437\u0430\u043f\u0443\u0449\u0435\u043d (run_id={run_id})") if task_id: update_task_tracker(task_id) def notify_agent_finished(run_id: int, agent: str, exit_code: int, task_id: int = None, duration_s: int = None): """Log agent completion and refresh the tracker (no separate message). The agent-FAILED alert (exit_code != 0) is still sent separately by the launcher via send_telegram; this helper itself only logs + refreshes. """ work_item_id = _get_work_item_id(task_id) if task_id else "?" if exit_code == 0: dur = f" ({duration_s // 60} \u043c\u0438\u043d)" if duration_s else "" msg = f"\u2705 {work_item_id}: {agent} \u0437\u0430\u0432\u0435\u0440\u0448\u0438\u043b{dur}" elif exit_code == -9: msg = f"\u23f0 {work_item_id}: {agent} \u0443\u0431\u0438\u0442 \u043f\u043e \u0442\u0430\u0439\u043c\u0430\u0443\u0442\u0443 (30 \u043c\u0438\u043d)" else: msg = f"\u274c {work_item_id}: {agent} \u0443\u043f\u0430\u043b (exit_code={exit_code})" logger.info(msg) if task_id: update_task_tracker(task_id) def notify_qg_result(task_id: int, check: str, passed: bool, reason: str = None): """Log a QG check result (NO separate Telegram message: QG-pending is noise). Kept for callers; QG outcomes are log-only now and reflected by the tracker through the resulting stage transition. """ work_item_id = _get_work_item_id(task_id) if passed: logger.info(f"\u2705 {work_item_id}: QG {check} \u2014 passed") else: logger.warning(f"\u26a0\ufe0f {work_item_id}: QG {check} \u2014 failed: {reason}") def notify_qg_failure(task_id: int, stage: str, check: str, reason: str): """Log a QG check failure (log-only). QG-pending / QG-failed are NOT pinged as separate messages anymore (they are not actionable for Slava). Real rollbacks/deploy-fails are alerted by their own dedicated send_telegram calls in the engine/launcher. """ work_item_id = _get_work_item_id(task_id) logger.warning(f"\u26a0\ufe0f {work_item_id}: QG {check} \u2014 failed: {reason}") def notify_approve_requested(task_id: int): """ALERT (separate, notifying): BRD/TZ/AC ready -> flip Plane to Approved. Also starts the BRD-review clock and refreshes the tracker so the '⏸️ Ревью БРД · твоё время ⏳' line appears. """ work_item_id = _get_work_item_id(task_id) try: from .db import mark_brd_review_started mark_brd_review_started(task_id) except Exception as e: logger.warning(f"notify_approve_requested: brd clock start failed: {e}") msg = ( f"\U0001f4cb {work_item_id}: BRD/\u0422\u0417/AC \u0433\u043e\u0442\u043e\u0432\u044b. " f"\u041f\u0435\u0440\u0435\u0432\u0435\u0434\u0438\u0442\u0435 \u0437\u0430\u0434\u0430\u0447\u0443 \u0432 \u0441\u0442\u0430\u0442\u0443\u0441 Approved " f"\u0432 Plane \u0434\u043b\u044f \u043f\u0440\u043e\u0434\u043e\u043b\u0436\u0435\u043d\u0438\u044f." ) logger.info(msg) update_task_tracker(task_id) send_telegram(msg) # separate, notifying def notify_done(task_id: int): """Task completion: refresh the tracker to its final ГОТОВО form (no separate ping).""" work_item_id = _get_work_item_id(task_id) logger.info(f"\U0001f389 {work_item_id}: \u0437\u0430\u0434\u0430\u0447\u0430 \u0437\u0430\u0432\u0435\u0440\u0448\u0435\u043d\u0430!") update_task_tracker(task_id) def notify_error(task_id: int, error: str): """ALERT (separate, notifying): task error.""" work_item_id = _get_work_item_id(task_id) if task_id else "system" msg = f"\U0001f534 {work_item_id}: ERROR \u2014 {error}" logger.error(msg) send_telegram(msg) # separate, notifying