Files
orchestrator/src/notifications.py
claude-bot 69a4aaab99
All checks were successful
CI / test (push) Successful in 12s
CI / test (pull_request) Successful in 12s
feat(notifications): direct BRD + Plane links in approve ping (ORCH-017)
notify_approve_requested now embeds two HTML <a> links into the single
notifying approve-gate message: a Gitea branch-view link to 01-brd.md and a
Plane issue browser link. Adds ORCH_PLANE_WEB_URL (external Plane web URL,
fallback to plane_api_url) with a loopback-guard that omits the Plane link
when the resolved base is localhost/empty (no broken localhost URLs in prod).

Each link is built independently and omitted on missing data; the message and
the "flip to Approved" call to action are always sent as exactly one ping. The
shared send_telegram helper is left untouched (min blast radius for the
self-hosting prod container). Dynamic labels are html.escaped; parse_mode=HTML
preserved. QG registry / stages / approve handler unchanged.

Docs updated in-PR: CHANGELOG, .env.example, INFRA env map.
Tests: test_notify_approve_links.py, test_analysis_approve_flow_links.py.

Refs: ORCH-017
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-06-05 17:58:00 +00:00

698 lines
28 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""Notifications and logging for orchestrator events.
feat/telegram-live-tracker (Variant B+): instead of ~15 separate Telegram
messages per task (agent start / finish / stage transition / QG-pending / tech
noise), the orchestrator now maintains ONE live tracker message per task that is
edited in place (editMessageText) on every stage transition. Only events that
NEED Slava's attention are sent as SEPARATE, notifying messages:
* approve-gate (notify_approve_requested) — BRD/TZ/AC ready, flip to Approved
* deploy failed / rolled back — send_telegram from launcher/engine
* agent failed (exit_code != 0) — send_telegram from launcher
* task error (notify_error)
The tracker itself is edited SILENTLY (disable_notification: true). Stage-change,
agent-start, agent-finish and QG-pending no longer emit their own messages — they
just refresh the tracker (or are log-only).
"""
import html
import logging
import httpx
logger = logging.getLogger("orchestrator")
# Lazy import to avoid circular imports at module level
_settings = None
def _get_settings():
global _settings
if _settings is None:
from .config import settings
_settings = settings
return _settings
# --------------------------------------------------------------------------- #
# Low-level Telegram primitives
# --------------------------------------------------------------------------- #
def send_telegram(text: str, disable_notification: bool = False):
"""Send a notification to Telegram. Fire-and-forget, never raises.
Returns the Telegram message_id on success, else None (so callers that want
to track the message — the tracker — can store it; legacy callers ignore it).
"""
s = _get_settings()
if not s.telegram_bot_token or not s.telegram_chat_id:
return None
try:
url = f"https://api.telegram.org/bot{s.telegram_bot_token}/sendMessage"
resp = httpx.post(
url,
json={
"chat_id": s.telegram_chat_id,
"text": text,
"parse_mode": "HTML",
"disable_notification": disable_notification,
},
timeout=5,
)
data = resp.json()
if data.get("ok"):
return data["result"]["message_id"]
except Exception:
pass # Never crash orchestrator due to notification failure
return None
# edit_telegram outcome codes -> let update_task_tracker decide what to do:
# "ok" edit applied -> nothing else to do
# "not_modified" Telegram says text is identical (400 "message is not
# modified" / "exactly the same") -> success, NO new message
# "gone" original message can't be edited (deleted / too old /
# invalid id) -> caller must fall back to a NEW message
# "failed" transient failure (network / timeout / 5xx / unknown 400)
# -> caller must NOT send a new message (avoid duplicates)
EDIT_OK = "ok"
EDIT_NOT_MODIFIED = "not_modified"
EDIT_GONE = "gone"
EDIT_FAILED = "failed"
# Telegram error descriptions that mean the message is permanently un-editable
# (it is gone / orphaned) -> fall back to a fresh message.
_GONE_MARKERS = (
"message to edit not found",
"message can't be edited",
"message_id_invalid",
)
# Telegram "nothing changed" -> treat as success, never a duplicate.
_NOT_MODIFIED_MARKERS = (
"message is not modified",
"exactly the same",
)
def edit_telegram(message_id: int, text: str) -> str:
"""Edit an existing Telegram message. Never raises.
Returns a distinguishable outcome (see EDIT_* constants) so the caller can
tell apart "all good" / "nothing changed" / "message gone" / "transient
failure" and only fall back to a NEW message when the original is truly gone.
"""
s = _get_settings()
if not s.telegram_bot_token or not s.telegram_chat_id:
return EDIT_FAILED
try:
url = f"https://api.telegram.org/bot{s.telegram_bot_token}/editMessageText"
resp = httpx.post(
url,
json={
"chat_id": s.telegram_chat_id,
"message_id": message_id,
"text": text,
"parse_mode": "HTML",
},
timeout=5,
)
data = resp.json()
if data.get("ok"):
return EDIT_OK
# ok:false -> inspect the description to classify the 400.
desc = str(data.get("description") or "").lower()
if any(m in desc for m in _NOT_MODIFIED_MARKERS):
# Text is identical between transitions (e.g. repeat review cycle
# renders the same line). Nothing to do, NOT a duplicate.
logger.debug(
f"edit_telegram(mid={message_id}): not modified, skipping"
)
return EDIT_NOT_MODIFIED
if any(m in desc for m in _GONE_MARKERS):
logger.warning(
f"edit_telegram(mid={message_id}): message gone ({desc!r}), "
f"will fall back to a new message"
)
return EDIT_GONE
# Unknown 400 / other non-ok -> transient/unknown, do NOT duplicate.
logger.warning(
f"edit_telegram(mid={message_id}): edit failed ({desc!r})"
)
return EDIT_FAILED
except Exception as e:
# Network / timeout / 5xx -> transient, do NOT duplicate.
logger.warning(f"edit_telegram(mid={message_id}): transient error: {e}")
return EDIT_FAILED
def _get_work_item_id(task_id: int) -> str:
"""Get work_item_id from DB by task_id."""
try:
from .db import get_db
conn = get_db()
row = conn.execute("SELECT work_item_id FROM tasks WHERE id=?", (task_id,)).fetchone()
conn.close()
return row[0] if row and row[0] else f"task-{task_id}"
except Exception:
return f"task-{task_id}"
# --------------------------------------------------------------------------- #
# Live task tracker
# --------------------------------------------------------------------------- #
# Pipeline stages shown in the tracker, in order, with their display label and
# the agent whose agent_runs rows describe that stage's work. "Ревью БРД" is NOT
# an agent stage — it is the human approve gate rendered between Analysis and
# Architecture from the task's brd_review_* timestamps.
_TRACKER_STAGES = [
("analysis", "Analysis", "analyst"),
("architecture", "Architecture", "architect"),
("development", "Development", "developer"),
("review", "Review", "reviewer"),
("testing", "Testing", "tester"),
("deploy", "Deploy", "deployer"),
]
# Map a pipeline stage -> the agent that is RUNNING while the task sits in it.
# (development is entered after architecture finishes, etc.) Used to render the
# "🔄 <Stage> … идёт" line for the currently-active stage.
_BRD_LABEL = "\u0420\u0435\u0432\u044c\u044e \u0411\u0420\u0414" # "Ревью БРД"
_STAGE_ACTIVE_AGENT = {
"analysis": "analyst",
"architecture": "architect",
"development": "developer",
"review": "reviewer",
"testing": "tester",
"deploy": "deployer",
}
def _fmt_minutes(seconds) -> str:
"""Render a duration in whole minutes: 0..59s -> '<1м', else '<n>м'."""
try:
seconds = int(seconds or 0)
except (TypeError, ValueError):
seconds = 0
if seconds <= 0:
return ""
if seconds < 60:
return "<1м"
return f"{seconds // 60}\u043c"
def _parse_sql_ts(ts):
"""Parse a SQLite 'YYYY-MM-DD HH:MM:SS' UTC timestamp -> aware datetime/None."""
if not ts:
return None
from datetime import datetime, timezone
for fmt in ("%Y-%m-%d %H:%M:%S", "%Y-%m-%dT%H:%M:%S"):
try:
return datetime.strptime(str(ts)[:19], fmt).replace(tzinfo=timezone.utc)
except (ValueError, TypeError):
continue
return None
def _duration_seconds(started, finished):
"""Seconds between two SQL timestamps; None if either is missing/unparseable."""
a = _parse_sql_ts(started)
b = _parse_sql_ts(finished)
if a is None or b is None:
return None
return max(int((b - a).total_seconds()), 0)
def render_task_tracker(task_id: int) -> str:
"""Build the full live-tracker text for a task from the DB (stateless render).
Pulls the task header (work_item_id, title, stage), every agent_runs row, and
the BRD-review timestamps, then renders:
- one '✅ <Stage> <dur> · <in>↓/<out>↑ · <cost> · <model>' line per finished
stage (latest run per stage),
- the '⏸️ Ревью БРД <dur> · твоё время[ ⏳]' line between Analysis/Architecture,
- a '🔄 <Stage> … идёт' line for the active (in-progress) stage,
- the '💰 <in>↓ / <out>↑ · <cost>' totals,
- on done: '⏱️ Всего .. · агенты .. · твоё ..' and a '🔗 PR / 📦' line.
Never raises (returns a minimal fallback string on error).
"""
from .db import get_db
from .usage import fmt_tokens, fmt_cost, _input_total, short_model_name
try:
conn = get_db()
task = conn.execute(
"SELECT id, work_item_id, title, stage, created_at, updated_at, "
"brd_review_started_at, brd_review_ended_at "
"FROM tasks WHERE id=?",
(task_id,),
).fetchone()
if not task:
conn.close()
return f"task-{task_id}"
runs = conn.execute(
"SELECT agent, started_at, finished_at, exit_code, input_tokens, "
"output_tokens, cache_read_tokens, cache_creation_tokens, cost_usd, model "
"FROM agent_runs WHERE task_id=? ORDER BY id ASC",
(task_id,),
).fetchall()
conn.close()
except Exception as e:
logger.warning(f"render_task_tracker({task_id}) DB error: {e}")
return f"task-{task_id}"
work_item_id = task["work_item_id"] or f"task-{task_id}"
title = task["title"] or work_item_id
stage = task["stage"] or "created"
done = stage == "done"
# Latest completed run per agent (a stage may have multiple runs on retry;
# we show the most recent FINISHED, successful run for the stage line).
last_done = {}
agent_runs_by_agent = {}
for r in runs:
agent_runs_by_agent.setdefault(r["agent"], []).append(r)
if r["finished_at"] and (r["exit_code"] == 0 or r["exit_code"] is None):
last_done[r["agent"]] = r
# Totals across ALL runs (every input/output token + cost counts).
total_in = 0
total_out = 0
total_cost = 0.0
agent_seconds = 0
for r in runs:
usage = {
"input_tokens": r["input_tokens"],
"cache_read_tokens": r["cache_read_tokens"],
"cache_creation_tokens": r["cache_creation_tokens"],
}
total_in += _input_total(usage)
total_out += int(r["output_tokens"] or 0)
total_cost += float(r["cost_usd"] or 0.0)
d = _duration_seconds(r["started_at"], r["finished_at"])
if d is not None:
agent_seconds += d
esc_title = html.escape(title)
header = (
f"\U0001f389 {html.escape(work_item_id)} \u00b7 {esc_title} \u2014 \u0413\u041e\u0422\u041e\u0412\u041e"
if done
else f"\U0001f6e0\ufe0f {html.escape(work_item_id)} \u00b7 {esc_title}"
)
bar = "\u2501" * 22
lines = [header, bar]
def _stage_line(label, run):
usage = {
"input_tokens": run["input_tokens"],
"cache_read_tokens": run["cache_read_tokens"],
"cache_creation_tokens": run["cache_creation_tokens"],
}
in_tok = fmt_tokens(_input_total(usage))
out_tok = fmt_tokens(run["output_tokens"])
cost = fmt_cost(run["cost_usd"])
dur = _fmt_minutes(_duration_seconds(run["started_at"], run["finished_at"]))
model = short_model_name(run["model"])
model_suffix = f" \u00b7 {model}" if model else ""
return (
f"\u2705 {label:<13} {dur} \u00b7 "
f"{in_tok}\u2193/{out_tok}\u2191 \u00b7 {cost}{model_suffix}"
)
# BRD review line: between Analysis and Architecture, only once Analysis has
# produced a run (i.e. the gate is live). Time = human review delta.
brd_started = task["brd_review_started_at"]
brd_ended = task["brd_review_ended_at"]
review_seconds = _duration_seconds(brd_started, brd_ended)
for stage_key, label, agent in _TRACKER_STAGES:
run = last_done.get(agent)
# The stage is "in progress" only when it is the task's current stage AND
# there is an unfinished run for its agent (the agent is actually still
# working). A finished run with no in-flight run -> show the \u2705 result,
# even if the task still sits in that stage (just-finished snapshot).
agent_runs = agent_runs_by_agent.get(agent, [])
has_inflight = any(ar["finished_at"] is None for ar in agent_runs)
is_active_stage = (
_STAGE_ACTIVE_AGENT.get(stage) == agent
and stage == stage_key
and (has_inflight or run is None)
)
if is_active_stage:
# Live "\U0001f504 ... \u0438\u0434\u0451\u0442" line. Count how many times THIS stage's
# agent has run for this task; a 2nd+ run means we're re-doing the
# stage (e.g. review->development->review), so show "\u043f\u043e\u043f\u044b\u0442\u043a\u0430 N"
# to make the text change between cycles and to honestly show Slava
# the stage is being re-worked.
attempt = len(agent_runs)
if attempt >= 2:
lines.append(
f"\U0001f504 {label} \u00b7 \u043f\u043e\u043f\u044b\u0442\u043a\u0430 {attempt} "
f"\u2026 \u0438\u0434\u0451\u0442"
)
else:
lines.append(
f"\U0001f504 {label:<13} \u2026 \u00b7 \u0438\u0434\u0451\u0442"
)
elif run is not None:
lines.append(_stage_line(label, run))
# else: not started yet -> not shown.
# Insert the BRD review line right after Analysis.
if stage_key == "analysis" and brd_started:
brd_label = f"{_BRD_LABEL:<13}"
if review_seconds is not None:
dur = _fmt_minutes(review_seconds)
lines.append(
f"\u23f8\ufe0f {brd_label} {dur} \u00b7 \u0442\u0432\u043e\u0451 \u0432\u0440\u0435\u043c\u044f"
)
else:
# Still waiting on the human (ended not stamped yet).
from datetime import datetime, timezone
start_dt = _parse_sql_ts(brd_started)
waited = None
if start_dt is not None:
waited = int(
(datetime.now(timezone.utc) - start_dt).total_seconds()
)
dur = _fmt_minutes(waited) if waited is not None else "\u2026"
lines.append(
f"\u23f8\ufe0f {brd_label} {dur} \u00b7 \u0442\u0432\u043e\u0451 \u0432\u0440\u0435\u043c\u044f \u23f3"
)
lines.append(bar)
lines.append(
f"\U0001f4b0 {fmt_tokens(total_in)}\u2193 / {fmt_tokens(total_out)}\u2191 \u00b7 "
f"{fmt_cost(total_cost)}"
)
if done:
wall = _duration_seconds(task["created_at"], task["updated_at"])
wall_str = _fmt_minutes(wall) if wall is not None else "?"
review_str = _fmt_minutes(review_seconds) if review_seconds else ""
lines.append(
f"\u23f1\ufe0f \u0412\u0441\u0435\u0433\u043e {wall_str} \u00b7 "
f"\u0430\u0433\u0435\u043d\u0442\u044b {_fmt_minutes(agent_seconds)} \u00b7 "
f"\u0442\u0432\u043e\u0451 {review_str}"
)
link = _done_link(task_id, task["work_item_id"])
if link:
lines.append(link)
return "\n".join(lines)
def _done_link(task_id: int, work_item_id) -> str | None:
"""Build the final '🔗 PR #n · 📦 deployed' line. Never raises -> None."""
try:
from .config import settings
from .db import get_db
conn = get_db()
row = conn.execute(
"SELECT repo, branch FROM tasks WHERE id=?", (task_id,)
).fetchone()
conn.close()
if not row:
return None
repo, branch = row["repo"], row["branch"]
pr_part = None
try:
owner = settings.gitea_owner
headers = {"Authorization": f"token {settings.gitea_token}"}
resp = httpx.get(
f"{settings.gitea_url}/api/v1/repos/{owner}/{repo}/pulls",
params={"state": "all", "head": branch},
headers=headers, timeout=5,
)
if resp.status_code == 200:
prs = resp.json()
if prs:
pr_part = f"\U0001f517 PR #{prs[0].get('number')}"
except Exception:
pr_part = None
parts = []
if pr_part:
parts.append(pr_part)
parts.append("\U0001f4e6 deployed")
return " \u00b7 ".join(parts)
except Exception:
return None
def update_task_tracker(task_id: int):
"""Render + push the live tracker for a task. Never raises.
First call (no stored tracker_message_id): sendMessage (silent) and store the
returned message_id. Subsequent calls: editMessageText the stored message.
A NEW message is sent ONLY when the original is truly gone (deleted / too old
/ invalid id). On "not modified" (text unchanged) or transient failures
(network / timeout / 5xx / unknown 400) we do NOT send a new message — that
is exactly what produced duplicate trackers and orphaned (lagging) messages.
The tracker is always sent with disable_notification so it never pings —
only the dedicated alert helpers ping.
"""
try:
from .db import get_tracker_message_id, set_tracker_message_id
text = render_task_tracker(task_id)
mid = get_tracker_message_id(task_id)
if mid is not None:
result = edit_telegram(mid, text)
if result in (EDIT_OK, EDIT_NOT_MODIFIED):
# Edited in place (or nothing to change) -> done, no duplicate.
return
if result == EDIT_FAILED:
# Transient -> don't duplicate; tracker redraws next transition.
logger.debug(
f"update_task_tracker({task_id}): edit failed transiently, "
f"keeping message {mid}"
)
return
# result == EDIT_GONE -> the stored message is gone; fall through
# to send a fresh one and re-point tracker_message_id at it.
new_mid = send_telegram(text, disable_notification=True)
if new_mid is not None:
set_tracker_message_id(task_id, new_mid)
except Exception as e:
logger.warning(f"update_task_tracker({task_id}) failed: {e}")
# --------------------------------------------------------------------------- #
# Stage / agent lifecycle notifications (now tracker-only, no separate message)
# --------------------------------------------------------------------------- #
def notify_stage_change(task_id: int, old_stage: str, new_stage: str, agent: str = None):
"""Log a stage transition and refresh the live tracker (no separate message)."""
work_item_id = _get_work_item_id(task_id)
msg = f"\U0001f504 {work_item_id}: {old_stage} \u2192 {new_stage}"
if agent:
msg += f" (\u0437\u0430\u043f\u0443\u0449\u0435\u043d {agent})"
logger.info(msg)
update_task_tracker(task_id)
def notify_agent_started(run_id: int, agent: str, task_id: int):
"""Log an agent launch and refresh the tracker (no separate message)."""
work_item_id = _get_work_item_id(task_id)
logger.info(f"\U0001f680 {work_item_id}: {agent} \u0437\u0430\u043f\u0443\u0449\u0435\u043d (run_id={run_id})")
if task_id:
update_task_tracker(task_id)
def notify_agent_finished(run_id: int, agent: str, exit_code: int, task_id: int = None, duration_s: int = None):
"""Log agent completion and refresh the tracker (no separate message).
The agent-FAILED alert (exit_code != 0) is still sent separately by the
launcher via send_telegram; this helper itself only logs + refreshes.
"""
work_item_id = _get_work_item_id(task_id) if task_id else "?"
if exit_code == 0:
dur = f" ({duration_s // 60} \u043c\u0438\u043d)" if duration_s else ""
msg = f"\u2705 {work_item_id}: {agent} \u0437\u0430\u0432\u0435\u0440\u0448\u0438\u043b{dur}"
elif exit_code == -9:
msg = f"\u23f0 {work_item_id}: {agent} \u0443\u0431\u0438\u0442 \u043f\u043e \u0442\u0430\u0439\u043c\u0430\u0443\u0442\u0443 (30 \u043c\u0438\u043d)"
else:
msg = f"\u274c {work_item_id}: {agent} \u0443\u043f\u0430\u043b (exit_code={exit_code})"
logger.info(msg)
if task_id:
update_task_tracker(task_id)
def notify_qg_result(task_id: int, check: str, passed: bool, reason: str = None):
"""Log a QG check result (NO separate Telegram message: QG-pending is noise).
Kept for callers; QG outcomes are log-only now and reflected by the tracker
through the resulting stage transition.
"""
work_item_id = _get_work_item_id(task_id)
if passed:
logger.info(f"\u2705 {work_item_id}: QG {check} \u2014 passed")
else:
logger.warning(f"\u26a0\ufe0f {work_item_id}: QG {check} \u2014 failed: {reason}")
def notify_qg_failure(task_id: int, stage: str, check: str, reason: str):
"""Log a QG check failure (log-only).
QG-pending / QG-failed are NOT pinged as separate messages anymore (they are
not actionable for Slava). Real rollbacks/deploy-fails are alerted by their
own dedicated send_telegram calls in the engine/launcher.
"""
work_item_id = _get_work_item_id(task_id)
logger.warning(f"\u26a0\ufe0f {work_item_id}: QG {check} \u2014 failed: {reason}")
# ORCH-017: hosts that are not clickable off the deploy box. A Plane web-base
# resolving to one of these (the plane_api_url loopback default) means "no usable
# browser URL" -> the Plane link is omitted rather than emitted broken (ADR-001 Р-3).
_LOOPBACK_HOSTS = frozenset({"localhost", "127.0.0.1", "0.0.0.0", "::1"})
def _is_loopback_base(url: str) -> bool:
"""True if the URL's host is a loopback/local address (not clickable off-host).
Empty/garbage URLs count as loopback (i.e. unusable) so callers omit the link.
"""
if not url:
return True
try:
from urllib.parse import urlparse
host = (urlparse(url).hostname or "").lower()
return (not host) or host in _LOOPBACK_HOSTS
except Exception:
return True
def _get_task_link_fields(task_id: int):
"""ORCH-017: read (repo, branch, plane_issue_id) for a task. Never raises.
Returns (None, None, None) on any error / missing row so link building can
degrade gracefully (AC-6).
"""
try:
from .db import get_db
conn = get_db()
row = conn.execute(
"SELECT repo, branch, plane_issue_id FROM tasks WHERE id=?", (task_id,)
).fetchone()
conn.close()
if not row:
return None, None, None
return row["repo"], row["branch"], row["plane_issue_id"]
except Exception as e:
logger.warning(f"_get_task_link_fields({task_id}) failed: {e}")
return None, None, None
def _build_brd_link(repo, branch, work_item_id) -> str | None:
"""ORCH-017: '<a>' to 01-brd.md in Gitea branch-view, or None if data missing.
Mirrors the canonical branch-view pattern in src/usage.py: base =
gitea_public_url or gitea_url, owner = gitea_owner (AC-1/AC-3). The href is
html.escaped as defence-in-depth even though parts come from trusted
config/DB (AC-7).
"""
s = _get_settings()
base = (
getattr(s, "gitea_public_url", "") or getattr(s, "gitea_url", "")
).rstrip("/")
owner = getattr(s, "gitea_owner", "")
if not (base and owner and repo and branch and work_item_id):
return None
url = (
f"{base}/{owner}/{repo}/src/branch/{branch}"
f"/docs/work-items/{work_item_id}/01-brd.md"
)
return (
f'<a href="{html.escape(url, quote=True)}">'
f"\U0001f4c4 Открыть BRD</a>"
)
def _build_plane_issue_link(repo, plane_issue_id) -> str | None:
"""ORCH-017: '<a>' to the Plane issue browser page, or None if unusable.
Full path per ADR-001 Р-2:
``{web_base}/{workspace_slug}/projects/{project_id}/issues/{issue_id}/``.
web_base = plane_web_url or plane_api_url (AC-3); a loopback base is treated
as "no web URL" and the link is omitted (loopback-guard, AC-2/AC-6).
"""
s = _get_settings()
web_base = (
getattr(s, "plane_web_url", "") or getattr(s, "plane_api_url", "")
).rstrip("/")
workspace = getattr(s, "plane_workspace_slug", "")
if not (web_base and workspace and plane_issue_id) or _is_loopback_base(web_base):
return None
try:
from .projects import get_project_by_repo
project = get_project_by_repo(repo) if repo else None
except Exception:
project = None
if not project or not getattr(project, "plane_project_id", ""):
return None
url = (
f"{web_base}/{workspace}/projects/{project.plane_project_id}"
f"/issues/{plane_issue_id}/"
)
return (
f'<a href="{html.escape(url, quote=True)}">'
f"✅ Задача в Plane</a>"
)
def notify_approve_requested(task_id: int):
"""ALERT (separate, notifying): BRD/TZ/AC ready -> flip Plane to Approved.
Also starts the BRD-review clock and refreshes the tracker so the
'⏸️ Ревью БРД · твоё время ⏳' line appears.
"""
work_item_id = _get_work_item_id(task_id)
try:
from .db import mark_brd_review_started
mark_brd_review_started(task_id)
except Exception as e:
logger.warning(f"notify_approve_requested: brd clock start failed: {e}")
msg = (
f"\U0001f4cb {html.escape(work_item_id)}: BRD/\u0422\u0417/AC \u0433\u043e\u0442\u043e\u0432\u044b. "
f"\u041f\u0435\u0440\u0435\u0432\u0435\u0434\u0438\u0442\u0435 \u0437\u0430\u0434\u0430\u0447\u0443 \u0432 \u0441\u0442\u0430\u0442\u0443\u0441 Approved "
f"\u0432 Plane \u0434\u043b\u044f \u043f\u0440\u043e\u0434\u043e\u043b\u0436\u0435\u043d\u0438\u044f."
)
# ORCH-017: embed direct links to the BRD doc (Gitea) and the Plane issue so
# the reviewer can open both straight from the ping. Each link is built
# independently and omitted if its data is missing; building is defensive so
# it can NEVER break the alert (AC-1/AC-2/AC-6). Still exactly one notifying
# message (AC-5); the call to action above is always preserved (AC-4).
try:
repo, branch, plane_issue_id = _get_task_link_fields(task_id)
links = [
link for link in (
_build_brd_link(repo, branch, work_item_id),
_build_plane_issue_link(repo, plane_issue_id),
) if link
]
if links:
msg = msg + "\n\n" + "\n".join(links)
except Exception as e:
logger.warning(f"notify_approve_requested({task_id}): link build failed: {e}")
logger.info(msg)
update_task_tracker(task_id)
send_telegram(msg) # separate, notifying
def notify_done(task_id: int):
"""Task completion: refresh the tracker to its final ГОТОВО form (no separate ping)."""
work_item_id = _get_work_item_id(task_id)
logger.info(f"\U0001f389 {work_item_id}: \u0437\u0430\u0434\u0430\u0447\u0430 \u0437\u0430\u0432\u0435\u0440\u0448\u0435\u043d\u0430!")
update_task_tracker(task_id)
def notify_error(task_id: int, error: str):
"""ALERT (separate, notifying): task error."""
work_item_id = _get_work_item_id(task_id) if task_id else "system"
msg = f"\U0001f534 {work_item_id}: ERROR \u2014 {error}"
logger.error(msg)
send_telegram(msg) # separate, notifying