"""Notifications and logging for orchestrator events. feat/telegram-live-tracker (Variant B+): instead of ~15 separate Telegram messages per task (agent start / finish / stage transition / QG-pending / tech noise), the orchestrator now maintains ONE live tracker message per task that is edited in place (editMessageText) on every stage transition. Only events that NEED Slava's attention are sent as SEPARATE, notifying messages: * approve-gate (notify_approve_requested) — BRD/TZ/AC ready, flip to Approved * deploy failed / rolled back — send_telegram from launcher/engine * agent failed (exit_code != 0) — send_telegram from launcher * task error (notify_error) The tracker itself is edited SILENTLY (disable_notification: true). Stage-change, agent-start, agent-finish and QG-pending no longer emit their own messages — they just refresh the tracker (or are log-only). """ import html import logging import httpx logger = logging.getLogger("orchestrator") # Lazy import to avoid circular imports at module level _settings = None def _get_settings(): global _settings if _settings is None: from .config import settings _settings = settings return _settings # --------------------------------------------------------------------------- # # Low-level Telegram primitives # --------------------------------------------------------------------------- # def send_telegram(text: str, disable_notification: bool = False): """Send a notification to Telegram. Fire-and-forget, never raises. Returns the Telegram message_id on success, else None (so callers that want to track the message — the tracker — can store it; legacy callers ignore it). """ s = _get_settings() if not s.telegram_bot_token or not s.telegram_chat_id: return None try: url = f"https://api.telegram.org/bot{s.telegram_bot_token}/sendMessage" resp = httpx.post( url, json={ "chat_id": s.telegram_chat_id, "text": text, "parse_mode": "HTML", "disable_notification": disable_notification, }, timeout=5, ) data = resp.json() if data.get("ok"): return data["result"]["message_id"] except Exception: pass # Never crash orchestrator due to notification failure return None # Telegram error descriptions that mean a deleteMessage target is already gone / # can't be deleted (>48h, already deleted, invalid id). Treated as "no longer our # problem" -> the caller proceeds to send a fresh card. NOT a transient failure. _DELETE_GONE_MARKERS = ( "message to delete not found", "message can't be deleted", "message_id_invalid", ) def delete_telegram(message_id: int) -> bool: """Delete a Telegram message. Never raises. Returns True if the message is gone after the call (deleted now, OR Telegram says it's already not there / can't be deleted -> treat as "no longer our problem", caller proceeds to send a fresh card). Returns False only on a transient failure (network / timeout / 5xx / unknown error) where the old message may still be alive. """ s = _get_settings() if not s.telegram_bot_token or not s.telegram_chat_id: # No creds -> nothing was deleted; mirror the other helpers' no-op path. return False try: url = f"https://api.telegram.org/bot{s.telegram_bot_token}/deleteMessage" resp = httpx.post( url, json={ "chat_id": s.telegram_chat_id, "message_id": message_id, }, timeout=5, ) data = resp.json() if data.get("ok"): return True # ok:false -> classify. "Already gone / can't delete" is an expected, # non-transient outcome (>48h, already deleted) -> the old message is no # longer there, caller should still send a fresh card. desc = str(data.get("description") or "").lower() if any(m in desc for m in _DELETE_GONE_MARKERS): logger.debug( f"delete_telegram(mid={message_id}): already gone ({desc!r})" ) return True # Unknown 400 / 5xx -> transient; the old message may still be alive. logger.warning( f"delete_telegram(mid={message_id}): delete failed ({desc!r})" ) return False except Exception as e: # Network / timeout -> transient; old message may still be alive. logger.warning(f"delete_telegram(mid={message_id}): transient error: {e}") return False # edit_telegram outcome codes -> let update_task_tracker decide what to do: # "ok" edit applied -> nothing else to do # "not_modified" Telegram says text is identical (400 "message is not # modified" / "exactly the same") -> success, NO new message # "gone" original message can't be edited (deleted / too old / # invalid id) -> caller must fall back to a NEW message # "failed" transient failure (network / timeout / 5xx / unknown 400) # -> caller must NOT send a new message (avoid duplicates) EDIT_OK = "ok" EDIT_NOT_MODIFIED = "not_modified" EDIT_GONE = "gone" EDIT_FAILED = "failed" # Telegram error descriptions that mean the message is permanently un-editable # (it is gone / orphaned) -> fall back to a fresh message. _GONE_MARKERS = ( "message to edit not found", "message can't be edited", "message_id_invalid", ) # Telegram "nothing changed" -> treat as success, never a duplicate. _NOT_MODIFIED_MARKERS = ( "message is not modified", "exactly the same", ) def edit_telegram(message_id: int, text: str) -> str: """Edit an existing Telegram message. Never raises. Returns a distinguishable outcome (see EDIT_* constants) so the caller can tell apart "all good" / "nothing changed" / "message gone" / "transient failure" and only fall back to a NEW message when the original is truly gone. """ s = _get_settings() if not s.telegram_bot_token or not s.telegram_chat_id: return EDIT_FAILED try: url = f"https://api.telegram.org/bot{s.telegram_bot_token}/editMessageText" resp = httpx.post( url, json={ "chat_id": s.telegram_chat_id, "message_id": message_id, "text": text, "parse_mode": "HTML", }, timeout=5, ) data = resp.json() if data.get("ok"): return EDIT_OK # ok:false -> inspect the description to classify the 400. desc = str(data.get("description") or "").lower() if any(m in desc for m in _NOT_MODIFIED_MARKERS): # Text is identical between transitions (e.g. repeat review cycle # renders the same line). Nothing to do, NOT a duplicate. logger.debug( f"edit_telegram(mid={message_id}): not modified, skipping" ) return EDIT_NOT_MODIFIED if any(m in desc for m in _GONE_MARKERS): logger.warning( f"edit_telegram(mid={message_id}): message gone ({desc!r}), " f"will fall back to a new message" ) return EDIT_GONE # Unknown 400 / other non-ok -> transient/unknown, do NOT duplicate. logger.warning( f"edit_telegram(mid={message_id}): edit failed ({desc!r})" ) return EDIT_FAILED except Exception as e: # Network / timeout / 5xx -> transient, do NOT duplicate. logger.warning(f"edit_telegram(mid={message_id}): transient error: {e}") return EDIT_FAILED def _get_work_item_id(task_id: int) -> str: """Get work_item_id from DB by task_id.""" try: from .db import get_db conn = get_db() row = conn.execute("SELECT work_item_id FROM tasks WHERE id=?", (task_id,)).fetchone() conn.close() return row[0] if row and row[0] else f"task-{task_id}" except Exception: return f"task-{task_id}" # --------------------------------------------------------------------------- # # Live task tracker # --------------------------------------------------------------------------- # # Pipeline stages shown in the tracker, in order, with their display label and # the agent whose agent_runs rows describe that stage's work. "Ревью БРД" is NOT # an agent stage — it is the human approve gate rendered between Analysis and # Architecture from the task's brd_review_* timestamps. # ORCH-042 (BR-11): display-labels are Russian. Stage KEYS (analysis, …) and # agent names (analyst, …) are NOT touched — they are wired to # _STAGE_ACTIVE_AGENT, last_done and the DB. Only the 2nd tuple element changed. _TRACKER_STAGES = [ ("analysis", "Анализ", "analyst"), # Анализ ("architecture", "Архитектура", "architect"), # Архитектура ("development", "Разработка", "developer"), # Разработка ("review", "Код ревью", "reviewer"), # Код ревью ("testing", "Тестирование", "tester"), # Тестирование ("deploy", "Внедрение", "deployer"), # Внедрение ] # Map a pipeline stage -> the agent that is RUNNING while the task sits in it. # (development is entered after architecture finishes, etc.) Used to render the # "🔄 … идёт" line for the currently-active stage. # ORCH-042 (BR-9): "Подтверждение BRD" (was "Ревью БРД"). _BRD_LABEL = "Подтверждение BRD" _STAGE_ACTIVE_AGENT = { "analysis": "analyst", "architecture": "architect", "development": "developer", "review": "reviewer", "testing": "tester", "deploy": "deployer", } def _fmt_minutes(seconds) -> str: """Render a duration in whole minutes: 0..59s -> '<1м', else 'м'.""" try: seconds = int(seconds or 0) except (TypeError, ValueError): seconds = 0 if seconds <= 0: return "0м" if seconds < 60: return "<1м" return f"{seconds // 60}\u043c" def _parse_sql_ts(ts): """Parse a SQLite 'YYYY-MM-DD HH:MM:SS' UTC timestamp -> aware datetime/None.""" if not ts: return None from datetime import datetime, timezone for fmt in ("%Y-%m-%d %H:%M:%S", "%Y-%m-%dT%H:%M:%S"): try: return datetime.strptime(str(ts)[:19], fmt).replace(tzinfo=timezone.utc) except (ValueError, TypeError): continue return None def _duration_seconds(started, finished): """Seconds between two SQL timestamps; None if either is missing/unparseable.""" a = _parse_sql_ts(started) b = _parse_sql_ts(finished) if a is None or b is None: return None return max(int((b - a).total_seconds()), 0) def render_task_tracker(task_id: int) -> str: """Build the full live-tracker text for a task from the DB (stateless render). Pulls the task header (work_item_id, title, stage), every agent_runs row, and the BRD-review timestamps, then renders: - one '✅ · ↓/↑ · · ' line per finished stage (latest run per stage), - the '✅/⏸️ Подтверждение BRD · твоё время[ ⏳]' line between Analysis/Architecture (✅ once the approve-gate passed, ⏸️+⏳ while waiting), - a '🔄 … идёт' line for the active (in-progress) stage, - the '💰 ↓ / ↑ · ' totals, - on done: '⏱️ Всего .. · агенты .. · твоё ..' and a '🔗 PR / 📦' line. Never raises (returns a minimal fallback string on error). """ from .db import get_db from .usage import fmt_tokens, fmt_cost, _input_total, short_model_name try: conn = get_db() task = conn.execute( "SELECT id, work_item_id, title, stage, created_at, updated_at, " "brd_review_started_at, brd_review_ended_at " "FROM tasks WHERE id=?", (task_id,), ).fetchone() if not task: conn.close() return f"task-{task_id}" runs = conn.execute( "SELECT agent, started_at, finished_at, exit_code, input_tokens, " "output_tokens, cache_read_tokens, cache_creation_tokens, cost_usd, model " "FROM agent_runs WHERE task_id=? ORDER BY id ASC", (task_id,), ).fetchall() conn.close() except Exception as e: logger.warning(f"render_task_tracker({task_id}) DB error: {e}") return f"task-{task_id}" work_item_id = task["work_item_id"] or f"task-{task_id}" title = task["title"] or work_item_id stage = task["stage"] or "created" done = stage == "done" # Latest completed run per agent (a stage may have multiple runs on retry; # we show the most recent FINISHED, successful run for the stage line). last_done = {} agent_runs_by_agent = {} for r in runs: agent_runs_by_agent.setdefault(r["agent"], []).append(r) if r["finished_at"] and (r["exit_code"] == 0 or r["exit_code"] is None): last_done[r["agent"]] = r # Totals across ALL runs (every input/output token + cost counts). total_in = 0 total_out = 0 total_cost = 0.0 agent_seconds = 0 for r in runs: usage = { "input_tokens": r["input_tokens"], "cache_read_tokens": r["cache_read_tokens"], "cache_creation_tokens": r["cache_creation_tokens"], } total_in += _input_total(usage) total_out += int(r["output_tokens"] or 0) total_cost += float(r["cost_usd"] or 0.0) d = _duration_seconds(r["started_at"], r["finished_at"]) if d is not None: agent_seconds += d esc_title = html.escape(title) header = ( f"\U0001f389 {html.escape(work_item_id)} \u00b7 {esc_title} \u2014 \u0413\u041e\u0422\u041e\u0412\u041e" if done else f"\U0001f6e0\ufe0f {html.escape(work_item_id)} \u00b7 {esc_title}" ) bar = "\u2501" * 22 lines = [header, bar] def _stage_line(label, run): usage = { "input_tokens": run["input_tokens"], "cache_read_tokens": run["cache_read_tokens"], "cache_creation_tokens": run["cache_creation_tokens"], } in_tok = fmt_tokens(_input_total(usage)) out_tok = fmt_tokens(run["output_tokens"]) cost = fmt_cost(run["cost_usd"]) dur = _fmt_minutes(_duration_seconds(run["started_at"], run["finished_at"])) model = short_model_name(run["model"]) model_suffix = f" \u00b7 {model}" if model else "" return ( f"\u2705 {label:<13} {dur} \u00b7 " f"{in_tok}\u2193/{out_tok}\u2191 \u00b7 {cost}{model_suffix}" ) # BRD review line: between Analysis and Architecture, only once Analysis has # produced a run (i.e. the gate is live). Time = human review delta. brd_started = task["brd_review_started_at"] brd_ended = task["brd_review_ended_at"] review_seconds = _duration_seconds(brd_started, brd_ended) for stage_key, label, agent in _TRACKER_STAGES: run = last_done.get(agent) # The stage is "in progress" only when it is the task's current stage AND # there is an unfinished run for its agent (the agent is actually still # working). A finished run with no in-flight run -> show the \u2705 result, # even if the task still sits in that stage (just-finished snapshot). agent_runs = agent_runs_by_agent.get(agent, []) has_inflight = any(ar["finished_at"] is None for ar in agent_runs) is_active_stage = ( _STAGE_ACTIVE_AGENT.get(stage) == agent and stage == stage_key and (has_inflight or run is None) ) if is_active_stage: # Live "\U0001f504 ... \u0438\u0434\u0451\u0442" line. Count how many times THIS stage's # agent has run for this task; a 2nd+ run means we're re-doing the # stage (e.g. review->development->review), so show "\u043f\u043e\u043f\u044b\u0442\u043a\u0430 N" # to make the text change between cycles and to honestly show Slava # the stage is being re-worked. attempt = len(agent_runs) if attempt >= 2: lines.append( f"\U0001f504 {label} \u00b7 \u043f\u043e\u043f\u044b\u0442\u043a\u0430 {attempt} " f"\u2026 \u0438\u0434\u0451\u0442" ) else: lines.append( f"\U0001f504 {label:<13} \u2026 \u00b7 \u0438\u0434\u0451\u0442" ) elif run is not None: lines.append(_stage_line(label, run)) # else: not started yet -> not shown. # Insert the BRD review line right after Analysis. if stage_key == "analysis" and brd_started: brd_label = f"{_BRD_LABEL:<13}" if review_seconds is not None: # ORCH-042 (BR-10): approve-gate passed -> \u2705 (was \u23f8\ufe0f). The # still-waiting branch below keeps \u23f8\ufe0f + \u23f3 unchanged. dur = _fmt_minutes(review_seconds) lines.append( f"\u2705 {brd_label} {dur} \u00b7 \u0442\u0432\u043e\u0451 \u0432\u0440\u0435\u043c\u044f" ) else: # Still waiting on the human (ended not stamped yet). from datetime import datetime, timezone start_dt = _parse_sql_ts(brd_started) waited = None if start_dt is not None: waited = int( (datetime.now(timezone.utc) - start_dt).total_seconds() ) dur = _fmt_minutes(waited) if waited is not None else "\u2026" lines.append( f"\u23f8\ufe0f {brd_label} {dur} \u00b7 \u0442\u0432\u043e\u0451 \u0432\u0440\u0435\u043c\u044f \u23f3" ) lines.append(bar) lines.append( f"\U0001f4b0 {fmt_tokens(total_in)}\u2193 / {fmt_tokens(total_out)}\u2191 \u00b7 " f"{fmt_cost(total_cost)}" ) if done: wall = _duration_seconds(task["created_at"], task["updated_at"]) wall_str = _fmt_minutes(wall) if wall is not None else "?" review_str = _fmt_minutes(review_seconds) if review_seconds else "0м" lines.append( f"\u23f1\ufe0f \u0412\u0441\u0435\u0433\u043e {wall_str} \u00b7 " f"\u0430\u0433\u0435\u043d\u0442\u044b {_fmt_minutes(agent_seconds)} \u00b7 " f"\u0442\u0432\u043e\u0451 {review_str}" ) link = _done_link(task_id, task["work_item_id"]) if link: lines.append(link) return "\n".join(lines) def _done_link(task_id: int, work_item_id) -> str | None: """Build the final '🔗 PR #n · 📦 Внедрено' line. Never raises -> None.""" try: from .config import settings from .db import get_db conn = get_db() row = conn.execute( "SELECT repo, branch FROM tasks WHERE id=?", (task_id,) ).fetchone() conn.close() if not row: return None repo, branch = row["repo"], row["branch"] pr_part = None try: owner = settings.gitea_owner headers = {"Authorization": f"token {settings.gitea_token}"} resp = httpx.get( f"{settings.gitea_url}/api/v1/repos/{owner}/{repo}/pulls", params={"state": "all", "head": branch}, headers=headers, timeout=5, ) if resp.status_code == 200: prs = resp.json() if prs: pr_part = f"\U0001f517 PR #{prs[0].get('number')}" except Exception: pr_part = None parts = [] if pr_part: parts.append(pr_part) parts.append("\U0001f4e6 Внедрено") # ORCH-042 (BR-12): was "deployed" return " \u00b7 ".join(parts) except Exception: return None def update_task_tracker(task_id: int): """Render + push the live tracker for a task. Never raises. Two modes, selected by Settings.tracker_mode (env ORCH_TRACKER_MODE), resolved case-insensitively here; anything other than "bump" -> "edit" (ORCH-042). Both keep the "one card per task" invariant. edit (DEFAULT): First call (no stored tracker_message_id): sendMessage (silent) and store the returned message_id. Subsequent calls: editMessageText the stored message. A NEW message is sent ONLY when the original is truly gone (deleted / too old / invalid id). On "not modified" (text unchanged) or transient failures (network / timeout / 5xx / unknown 400) we do NOT send a new message — that is exactly what produced duplicate trackers and orphaned (lagging) messages. bump (ORCH-042): The card is re-created at the BOTTOM of the chat on every update: best-effort delete_telegram(old_id) (its result NEVER blocks the send), then sendMessage (silent), then re-point tracker_message_id to the new id — but ONLY on a successful send (new_mid is not None), so a transient send failure never wipes the pointer to None. At most ONE new message is sent per call -> no duplicates within a call. The tracker is always sent with disable_notification so it never pings — only the dedicated alert helpers ping. """ try: from .db import get_tracker_message_id, set_tracker_message_id text = render_task_tracker(task_id) mode = (_get_settings().tracker_mode or "edit").strip().lower() mid = get_tracker_message_id(task_id) if mode == "bump": # bump: one card, always at the bottom (delete + send + repoint). if mid is not None: # best-effort; result does NOT gate the send (BR-6). delete_telegram(mid) new_mid = send_telegram(text, disable_notification=True) if new_mid is not None: set_tracker_message_id(task_id, new_mid) # send returned None (no creds / transient) -> leave mid untouched; # no duplicate within this call, redraws on the next transition. return # mode == "edit" (DEFAULT): existing behaviour, unchanged. if mid is not None: result = edit_telegram(mid, text) if result in (EDIT_OK, EDIT_NOT_MODIFIED): # Edited in place (or nothing to change) -> done, no duplicate. return if result == EDIT_FAILED: # Transient -> don't duplicate; tracker redraws next transition. logger.debug( f"update_task_tracker({task_id}): edit failed transiently, " f"keeping message {mid}" ) return # result == EDIT_GONE -> the stored message is gone; fall through # to send a fresh one and re-point tracker_message_id at it. new_mid = send_telegram(text, disable_notification=True) if new_mid is not None: set_tracker_message_id(task_id, new_mid) except Exception as e: logger.warning(f"update_task_tracker({task_id}) failed: {e}") # --------------------------------------------------------------------------- # # Stage / agent lifecycle notifications (now tracker-only, no separate message) # --------------------------------------------------------------------------- # def notify_stage_change(task_id: int, old_stage: str, new_stage: str, agent: str = None): """Log a stage transition and refresh the live tracker (no separate message).""" work_item_id = _get_work_item_id(task_id) msg = f"\U0001f504 {work_item_id}: {old_stage} \u2192 {new_stage}" if agent: msg += f" (\u0437\u0430\u043f\u0443\u0449\u0435\u043d {agent})" logger.info(msg) update_task_tracker(task_id) def notify_agent_started(run_id: int, agent: str, task_id: int): """Log an agent launch and refresh the tracker (no separate message).""" work_item_id = _get_work_item_id(task_id) logger.info(f"\U0001f680 {work_item_id}: {agent} \u0437\u0430\u043f\u0443\u0449\u0435\u043d (run_id={run_id})") if task_id: update_task_tracker(task_id) def notify_agent_finished(run_id: int, agent: str, exit_code: int, task_id: int = None, duration_s: int = None): """Log agent completion and refresh the tracker (no separate message). The agent-FAILED alert (exit_code != 0) is still sent separately by the launcher via send_telegram; this helper itself only logs + refreshes. """ work_item_id = _get_work_item_id(task_id) if task_id else "?" if exit_code == 0: dur = f" ({duration_s // 60} \u043c\u0438\u043d)" if duration_s else "" msg = f"\u2705 {work_item_id}: {agent} \u0437\u0430\u0432\u0435\u0440\u0448\u0438\u043b{dur}" elif exit_code == -9: msg = f"\u23f0 {work_item_id}: {agent} \u0443\u0431\u0438\u0442 \u043f\u043e \u0442\u0430\u0439\u043c\u0430\u0443\u0442\u0443 (30 \u043c\u0438\u043d)" else: msg = f"\u274c {work_item_id}: {agent} \u0443\u043f\u0430\u043b (exit_code={exit_code})" logger.info(msg) if task_id: update_task_tracker(task_id) def notify_qg_result(task_id: int, check: str, passed: bool, reason: str = None): """Log a QG check result (NO separate Telegram message: QG-pending is noise). Kept for callers; QG outcomes are log-only now and reflected by the tracker through the resulting stage transition. """ work_item_id = _get_work_item_id(task_id) if passed: logger.info(f"\u2705 {work_item_id}: QG {check} \u2014 passed") else: logger.warning(f"\u26a0\ufe0f {work_item_id}: QG {check} \u2014 failed: {reason}") def notify_qg_failure(task_id: int, stage: str, check: str, reason: str): """Log a QG check failure (log-only). QG-pending / QG-failed are NOT pinged as separate messages anymore (they are not actionable for Slava). Real rollbacks/deploy-fails are alerted by their own dedicated send_telegram calls in the engine/launcher. """ work_item_id = _get_work_item_id(task_id) logger.warning(f"\u26a0\ufe0f {work_item_id}: QG {check} \u2014 failed: {reason}") # ORCH-017: hosts that are not clickable off the deploy box. A Plane web-base # resolving to one of these (the plane_api_url loopback default) means "no usable # browser URL" -> the Plane link is omitted rather than emitted broken (ADR-001 Р-3). _LOOPBACK_HOSTS = frozenset({"localhost", "127.0.0.1", "0.0.0.0", "::1"}) def _is_loopback_base(url: str) -> bool: """True if the URL's host is a loopback/local address (not clickable off-host). Empty/garbage URLs count as loopback (i.e. unusable) so callers omit the link. """ if not url: return True try: from urllib.parse import urlparse host = (urlparse(url).hostname or "").lower() return (not host) or host in _LOOPBACK_HOSTS except Exception: return True def _get_task_link_fields(task_id: int): """ORCH-017: read (repo, branch, plane_issue_id) for a task. Never raises. Returns (None, None, None) on any error / missing row so link building can degrade gracefully (AC-6). """ try: from .db import get_db conn = get_db() row = conn.execute( "SELECT repo, branch, plane_issue_id FROM tasks WHERE id=?", (task_id,) ).fetchone() conn.close() if not row: return None, None, None return row["repo"], row["branch"], row["plane_issue_id"] except Exception as e: logger.warning(f"_get_task_link_fields({task_id}) failed: {e}") return None, None, None def _build_brd_link(repo, branch, work_item_id) -> str | None: """ORCH-017: '' to 01-brd.md in Gitea branch-view, or None if data missing. Mirrors the canonical branch-view pattern in src/usage.py: base = gitea_public_url or gitea_url, owner = gitea_owner (AC-1/AC-3). The href is html.escaped as defence-in-depth even though parts come from trusted config/DB (AC-7). """ s = _get_settings() base = ( getattr(s, "gitea_public_url", "") or getattr(s, "gitea_url", "") ).rstrip("/") owner = getattr(s, "gitea_owner", "") if not (base and owner and repo and branch and work_item_id): return None url = ( f"{base}/{owner}/{repo}/src/branch/{branch}" f"/docs/work-items/{work_item_id}/01-brd.md" ) return ( f'' f"\U0001f4c4 Открыть BRD" ) def _build_plane_issue_link(repo, plane_issue_id) -> str | None: """ORCH-017: '' to the Plane issue browser page, or None if unusable. Full path per ADR-001 Р-2: ``{web_base}/{workspace_slug}/projects/{project_id}/issues/{issue_id}/``. web_base = plane_web_url or plane_api_url (AC-3); a loopback base is treated as "no web URL" and the link is omitted (loopback-guard, AC-2/AC-6). """ s = _get_settings() web_base = ( getattr(s, "plane_web_url", "") or getattr(s, "plane_api_url", "") ).rstrip("/") workspace = getattr(s, "plane_workspace_slug", "") if not (web_base and workspace and plane_issue_id) or _is_loopback_base(web_base): return None try: from .projects import get_project_by_repo project = get_project_by_repo(repo) if repo else None except Exception: project = None if not project or not getattr(project, "plane_project_id", ""): return None url = ( f"{web_base}/{workspace}/projects/{project.plane_project_id}" f"/issues/{plane_issue_id}/" ) return ( f'' f"✅ Задача в Plane" ) def notify_approve_requested(task_id: int): """ALERT (separate, notifying): BRD/TZ/AC ready -> flip Plane to Approved. Also starts the BRD-review clock and refreshes the tracker so the '⏸️ Ревью БРД · твоё время ⏳' line appears. """ work_item_id = _get_work_item_id(task_id) try: from .db import mark_brd_review_started mark_brd_review_started(task_id) except Exception as e: logger.warning(f"notify_approve_requested: brd clock start failed: {e}") msg = ( f"\U0001f4cb {html.escape(work_item_id)}: BRD/\u0422\u0417/AC \u0433\u043e\u0442\u043e\u0432\u044b. " f"\u041f\u0435\u0440\u0435\u0432\u0435\u0434\u0438\u0442\u0435 \u0437\u0430\u0434\u0430\u0447\u0443 \u0432 \u0441\u0442\u0430\u0442\u0443\u0441 Approved " f"\u0432 Plane \u0434\u043b\u044f \u043f\u0440\u043e\u0434\u043e\u043b\u0436\u0435\u043d\u0438\u044f." ) # ORCH-017: embed direct links to the BRD doc (Gitea) and the Plane issue so # the reviewer can open both straight from the ping. Each link is built # independently and omitted if its data is missing; building is defensive so # it can NEVER break the alert (AC-1/AC-2/AC-6). Still exactly one notifying # message (AC-5); the call to action above is always preserved (AC-4). try: repo, branch, plane_issue_id = _get_task_link_fields(task_id) links = [ link for link in ( _build_brd_link(repo, branch, work_item_id), _build_plane_issue_link(repo, plane_issue_id), ) if link ] if links: msg = msg + "\n\n" + "\n".join(links) except Exception as e: logger.warning(f"notify_approve_requested({task_id}): link build failed: {e}") logger.info(msg) update_task_tracker(task_id) send_telegram(msg) # separate, notifying def notify_done(task_id: int): """Task completion: refresh the tracker to its final ГОТОВО form (no separate ping).""" work_item_id = _get_work_item_id(task_id) logger.info(f"\U0001f389 {work_item_id}: \u0437\u0430\u0434\u0430\u0447\u0430 \u0437\u0430\u0432\u0435\u0440\u0448\u0435\u043d\u0430!") update_task_tracker(task_id) def notify_error(task_id: int, error: str): """ALERT (separate, notifying): task error.""" work_item_id = _get_work_item_id(task_id) if task_id else "system" msg = f"\U0001f534 {work_item_id}: ERROR \u2014 {error}" logger.error(msg) send_telegram(msg) # separate, notifying