Files
orchestrator/src/notifications.py
claude-bot 9981f94e19
All checks were successful
CI / test (push) Successful in 25s
CI / test (pull_request) Successful in 23s
fix: disable Telegram link-preview in tracker notifications (ORCH-080)
Add "disable_web_page_preview": True to the JSON payload of both
low-level Telegram primitives — send_telegram (POST /sendMessage) and
edit_telegram (POST /editMessageText). Telegram no longer expands the
Plane "Modern project management" link-preview banner under every
tracker card (bump/edit) and notify/alert message, which the default
bump mode (ORCH-067) was duplicating on each transition.

Single-point fix at the primitive level — all consumers
(update_task_tracker, notify_approve_requested, notify_error, stage
alerts from launcher/stage_engine) inherit it without code changes.
parse_mode: HTML is preserved so the ORCH-NNN issue link stays
clickable; disable_notification, bump/edit logic, the one-card-per-task
invariant, return contracts and never-raise are untouched. Unconditional,
no kill-switch (ADR-001).

Tests: tests/test_link_preview_disabled.py (TC-01..06). Docs: CHANGELOG,
CLAUDE.md, docs/architecture/README.md (Notifications component).

Refs: ORCH-080
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-09 01:24:04 +03:00

1070 lines
44 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""Notifications and logging for orchestrator events.
feat/telegram-live-tracker (Variant B+): instead of ~15 separate Telegram
messages per task (agent start / finish / stage transition / QG-pending / tech
noise), the orchestrator now maintains ONE live tracker message per task that is
edited in place (editMessageText) on every stage transition. Only events that
NEED Slava's attention are sent as SEPARATE, notifying messages:
* approve-gate (notify_approve_requested) — BRD/TZ/AC ready, flip to Approved
* deploy failed / rolled back — send_telegram from launcher/engine
* agent failed (exit_code != 0) — send_telegram from launcher
* task error (notify_error)
The tracker itself is edited SILENTLY (disable_notification: true). Stage-change,
agent-start, agent-finish and QG-pending no longer emit their own messages — they
just refresh the tracker (or are log-only).
"""
import html
import logging
import httpx
logger = logging.getLogger("orchestrator")
# Lazy import to avoid circular imports at module level
_settings = None
def _get_settings():
global _settings
if _settings is None:
from .config import settings
_settings = settings
return _settings
# --------------------------------------------------------------------------- #
# Low-level Telegram primitives
# --------------------------------------------------------------------------- #
def send_telegram(text: str, disable_notification: bool = False):
"""Send a notification to Telegram. Fire-and-forget, never raises.
Returns the Telegram message_id on success, else None (so callers that want
to track the message — the tracker — can store it; legacy callers ignore it).
"""
s = _get_settings()
if not s.telegram_bot_token or not s.telegram_chat_id:
return None
try:
url = f"https://api.telegram.org/bot{s.telegram_bot_token}/sendMessage"
resp = httpx.post(
url,
json={
"chat_id": s.telegram_chat_id,
"text": text,
"parse_mode": "HTML",
"disable_notification": disable_notification,
# ORCH-080: suppress the Plane link-preview banner that Telegram
# would otherwise expand under every tracker card / notification.
"disable_web_page_preview": True,
},
timeout=5,
)
data = resp.json()
if data.get("ok"):
return data["result"]["message_id"]
except Exception:
pass # Never crash orchestrator due to notification failure
return None
# Telegram error descriptions that mean a deleteMessage target is already gone /
# can't be deleted (>48h, already deleted, invalid id). Treated as "no longer our
# problem" -> the caller proceeds to send a fresh card. NOT a transient failure.
_DELETE_GONE_MARKERS = (
"message to delete not found",
"message can't be deleted",
"message_id_invalid",
)
def delete_telegram(message_id: int) -> bool:
"""Delete a Telegram message. Never raises.
Returns True if the message is gone after the call (deleted now, OR Telegram
says it's already not there / can't be deleted -> treat as "no longer our
problem", caller proceeds to send a fresh card). Returns False only on a
transient failure (network / timeout / 5xx / unknown error) where the old
message may still be alive.
"""
s = _get_settings()
if not s.telegram_bot_token or not s.telegram_chat_id:
# No creds -> nothing was deleted; mirror the other helpers' no-op path.
return False
try:
url = f"https://api.telegram.org/bot{s.telegram_bot_token}/deleteMessage"
resp = httpx.post(
url,
json={
"chat_id": s.telegram_chat_id,
"message_id": message_id,
},
timeout=5,
)
data = resp.json()
if data.get("ok"):
return True
# ok:false -> classify. "Already gone / can't delete" is an expected,
# non-transient outcome (>48h, already deleted) -> the old message is no
# longer there, caller should still send a fresh card.
desc = str(data.get("description") or "").lower()
if any(m in desc for m in _DELETE_GONE_MARKERS):
logger.debug(
f"delete_telegram(mid={message_id}): already gone ({desc!r})"
)
return True
# Unknown 400 / 5xx -> transient; the old message may still be alive.
logger.warning(
f"delete_telegram(mid={message_id}): delete failed ({desc!r})"
)
return False
except Exception as e:
# Network / timeout -> transient; old message may still be alive.
logger.warning(f"delete_telegram(mid={message_id}): transient error: {e}")
return False
# edit_telegram outcome codes -> let update_task_tracker decide what to do:
# "ok" edit applied -> nothing else to do
# "not_modified" Telegram says text is identical (400 "message is not
# modified" / "exactly the same") -> success, NO new message
# "gone" original message can't be edited (deleted / too old /
# invalid id) -> caller must fall back to a NEW message
# "failed" transient failure (network / timeout / 5xx / unknown 400)
# -> caller must NOT send a new message (avoid duplicates)
EDIT_OK = "ok"
EDIT_NOT_MODIFIED = "not_modified"
EDIT_GONE = "gone"
EDIT_FAILED = "failed"
# Telegram error descriptions that mean the message is permanently un-editable
# (it is gone / orphaned) -> fall back to a fresh message.
_GONE_MARKERS = (
"message to edit not found",
"message can't be edited",
"message_id_invalid",
)
# Telegram "nothing changed" -> treat as success, never a duplicate.
_NOT_MODIFIED_MARKERS = (
"message is not modified",
"exactly the same",
)
def edit_telegram(message_id: int, text: str) -> str:
"""Edit an existing Telegram message. Never raises.
Returns a distinguishable outcome (see EDIT_* constants) so the caller can
tell apart "all good" / "nothing changed" / "message gone" / "transient
failure" and only fall back to a NEW message when the original is truly gone.
"""
s = _get_settings()
if not s.telegram_bot_token or not s.telegram_chat_id:
return EDIT_FAILED
try:
url = f"https://api.telegram.org/bot{s.telegram_bot_token}/editMessageText"
resp = httpx.post(
url,
json={
"chat_id": s.telegram_chat_id,
"message_id": message_id,
"text": text,
"parse_mode": "HTML",
# ORCH-080: suppress the Plane link-preview banner (see send_telegram).
"disable_web_page_preview": True,
},
timeout=5,
)
data = resp.json()
if data.get("ok"):
return EDIT_OK
# ok:false -> inspect the description to classify the 400.
desc = str(data.get("description") or "").lower()
if any(m in desc for m in _NOT_MODIFIED_MARKERS):
# Text is identical between transitions (e.g. repeat review cycle
# renders the same line). Nothing to do, NOT a duplicate.
logger.debug(
f"edit_telegram(mid={message_id}): not modified, skipping"
)
return EDIT_NOT_MODIFIED
if any(m in desc for m in _GONE_MARKERS):
logger.warning(
f"edit_telegram(mid={message_id}): message gone ({desc!r}), "
f"will fall back to a new message"
)
return EDIT_GONE
# Unknown 400 / other non-ok -> transient/unknown, do NOT duplicate.
logger.warning(
f"edit_telegram(mid={message_id}): edit failed ({desc!r})"
)
return EDIT_FAILED
except Exception as e:
# Network / timeout / 5xx -> transient, do NOT duplicate.
logger.warning(f"edit_telegram(mid={message_id}): transient error: {e}")
return EDIT_FAILED
def _get_work_item_id(task_id: int) -> str:
"""Get work_item_id from DB by task_id."""
try:
from .db import get_db
conn = get_db()
row = conn.execute("SELECT work_item_id FROM tasks WHERE id=?", (task_id,)).fetchone()
conn.close()
return row[0] if row and row[0] else f"task-{task_id}"
except Exception:
return f"task-{task_id}"
# --------------------------------------------------------------------------- #
# Live task tracker
# --------------------------------------------------------------------------- #
# Pipeline stages shown in the tracker, in order, with their display label and
# the agent whose agent_runs rows describe that stage's work. "Ревью БРД" is NOT
# an agent stage — it is the human approve gate rendered between Analysis and
# Architecture from the task's brd_review_* timestamps.
# ORCH-042 (BR-11): display-labels are Russian. Stage KEYS (analysis, …) and
# agent names (analyst, …) are NOT touched — they are wired to
# _STAGE_ACTIVE_AGENT, last_done and the DB. Only the 2nd tuple element changed.
_TRACKER_STAGES = [
("analysis", "Анализ", "analyst"), # Анализ
("architecture", "Архитектура", "architect"), # Архитектура
("development", "Разработка", "developer"), # Разработка
("review", "Код ревью", "reviewer"), # Код ревью
("testing", "Тестирование", "tester"), # Тестирование
("deploy", "Внедрение", "deployer"), # Внедрение
]
# Map a pipeline stage -> the agent that is RUNNING while the task sits in it.
# (development is entered after architecture finishes, etc.) Used to render the
# "🔄 <Stage> … идёт" line for the currently-active stage.
# ORCH-042 (BR-9): "Подтверждение BRD" (was "Ревью БРД").
_BRD_LABEL = "Подтверждение BRD"
_STAGE_ACTIVE_AGENT = {
"analysis": "analyst",
"architecture": "architect",
"development": "developer",
"review": "reviewer",
"testing": "tester",
"deploy": "deployer",
}
def _fmt_minutes(seconds) -> str:
"""Render a duration in whole minutes: 0..59s -> '<1м', else '<n>м'."""
try:
seconds = int(seconds or 0)
except (TypeError, ValueError):
seconds = 0
if seconds <= 0:
return ""
if seconds < 60:
return "<1м"
return f"{seconds // 60}\u043c"
def _parse_sql_ts(ts):
"""Parse a SQLite 'YYYY-MM-DD HH:MM:SS' UTC timestamp -> aware datetime/None."""
if not ts:
return None
from datetime import datetime, timezone
for fmt in ("%Y-%m-%d %H:%M:%S", "%Y-%m-%dT%H:%M:%S"):
try:
return datetime.strptime(str(ts)[:19], fmt).replace(tzinfo=timezone.utc)
except (ValueError, TypeError):
continue
return None
def _duration_seconds(started, finished):
"""Seconds between two SQL timestamps; None if either is missing/unparseable."""
a = _parse_sql_ts(started)
b = _parse_sql_ts(finished)
if a is None or b is None:
return None
return max(int((b - a).total_seconds()), 0)
def render_task_tracker(task_id: int) -> str:
"""Build the full live-tracker text for a task from the DB (stateless render).
Pulls the task header (work_item_id, title, stage), every agent_runs row, and
the BRD-review timestamps, then renders:
- one '✅ <Stage> <dur> · <in>↓/<out>↑ · <cost> · <model>' line per finished
stage (latest run per stage),
- the '✅/⏸️ Подтверждение BRD <dur> · твоё время[ ⏳]' line between
Analysis/Architecture (✅ once the approve-gate passed, ⏸️+⏳ while waiting),
- a '🔄 <Stage> … идёт' line for the active (in-progress) stage,
- the '💰 <in>↓ / <out>↑ · <cost>' totals,
- on done: '⏱️ Всего .. · агенты .. · твоё ..' and a '🔗 PR / 📦' line.
Never raises (returns a minimal fallback string on error).
"""
from .db import get_db
from .usage import fmt_tokens, fmt_cost, _input_total, short_model_name
try:
conn = get_db()
task = conn.execute(
"SELECT id, work_item_id, title, stage, created_at, updated_at, "
"brd_review_started_at, brd_review_ended_at, repo, plane_issue_id "
"FROM tasks WHERE id=?",
(task_id,),
).fetchone()
if not task:
conn.close()
return f"task-{task_id}"
runs = conn.execute(
"SELECT agent, started_at, finished_at, exit_code, input_tokens, "
"output_tokens, cache_read_tokens, cache_creation_tokens, cost_usd, model "
"FROM agent_runs WHERE task_id=? ORDER BY id ASC",
(task_id,),
).fetchall()
conn.close()
except Exception as e:
logger.warning(f"render_task_tracker({task_id}) DB error: {e}")
return f"task-{task_id}"
work_item_id = task["work_item_id"] or f"task-{task_id}"
title = task["title"] or work_item_id
stage = task["stage"] or "created"
done = stage == "done"
# Latest completed run per agent (a stage may have multiple runs on retry;
# we show the most recent FINISHED, successful run for the stage line).
last_done = {}
agent_runs_by_agent = {}
for r in runs:
agent_runs_by_agent.setdefault(r["agent"], []).append(r)
if r["finished_at"] and (r["exit_code"] == 0 or r["exit_code"] is None):
last_done[r["agent"]] = r
# Totals across ALL runs (every input/output token + cost counts).
total_in = 0
total_out = 0
total_cost = 0.0
agent_seconds = 0
for r in runs:
usage = {
"input_tokens": r["input_tokens"],
"cache_read_tokens": r["cache_read_tokens"],
"cache_creation_tokens": r["cache_creation_tokens"],
}
total_in += _input_total(usage)
total_out += int(r["output_tokens"] or 0)
total_cost += float(r["cost_usd"] or 0.0)
d = _duration_seconds(r["started_at"], r["finished_at"])
if d is not None:
agent_seconds += d
esc_title = html.escape(title)
# ORCH-067 (req 3): the issue number in the header is now a clickable link to
# the Plane issue (degrades to the escaped number when no web URL \u2014 fail-safe).
task_repo = _row_get(task, "repo")
task_issue_id = _row_get(task, "plane_issue_id")
num_html = plane_issue_link(work_item_id, plane_issue_id=task_issue_id, repo=task_repo)
header = (
f"\U0001f389 {num_html} \u00b7 {esc_title} \u2014 \u0413\u041e\u0422\u041e\u0412\u041e"
if done
else f"\U0001f6e0\ufe0f {num_html} \u00b7 {esc_title}"
)
bar = "\u2501" * 22
# ORCH-067 (req 2): a Plane-status line (model ORCH-066) under the header.
# Built fail-safe: any error degrades to a stage default, never breaks render.
try:
status_label = _card_status_label(
task, repo=task_repo, plane_issue_id=task_issue_id
)
except Exception:
status_label = _DEFAULT_STATUS_LABEL
status_line = f"\U0001f4cd {status_label}"
lines = [header, status_line, bar]
# ORCH-026 (B-4): waiting-line for a task blocked by an unfinished declared
# dependency. Shows WHAT the task is waiting on ("⏳ ждёт ORCH-NNN"),
# so the single tracker card (invariant preserved) makes the wait visible.
# Never breaks the render: any error -> no waiting-line.
if not done:
try:
from . import task_deps
from .config import settings as _settings
if getattr(_settings, "task_deps_enabled", False):
ready, waiting_on = task_deps.is_task_ready(task_id)
if not ready and waiting_on:
waits = ", ".join(link_for(w) for w in waiting_on)
lines.append(f"⏳ ждёт {waits}")
except Exception:
pass
def _stage_line(label, run):
usage = {
"input_tokens": run["input_tokens"],
"cache_read_tokens": run["cache_read_tokens"],
"cache_creation_tokens": run["cache_creation_tokens"],
}
in_tok = fmt_tokens(_input_total(usage))
out_tok = fmt_tokens(run["output_tokens"])
cost = fmt_cost(run["cost_usd"])
dur = _fmt_minutes(_duration_seconds(run["started_at"], run["finished_at"]))
model = short_model_name(run["model"])
model_suffix = f" \u00b7 {model}" if model else ""
return (
f"\u2705 {label:<13} {dur} \u00b7 "
f"{in_tok}\u2193/{out_tok}\u2191 \u00b7 {cost}{model_suffix}"
)
# BRD review line: between Analysis and Architecture, only once Analysis has
# produced a run (i.e. the gate is live). Time = human review delta.
brd_started = task["brd_review_started_at"]
brd_ended = task["brd_review_ended_at"]
review_seconds = _duration_seconds(brd_started, brd_ended)
for stage_key, label, agent in _TRACKER_STAGES:
run = last_done.get(agent)
# The stage is "in progress" only when it is the task's current stage AND
# there is an unfinished run for its agent (the agent is actually still
# working). A finished run with no in-flight run -> show the \u2705 result,
# even if the task still sits in that stage (just-finished snapshot).
agent_runs = agent_runs_by_agent.get(agent, [])
has_inflight = any(ar["finished_at"] is None for ar in agent_runs)
is_active_stage = (
_STAGE_ACTIVE_AGENT.get(stage) == agent
and stage == stage_key
and (has_inflight or run is None)
)
if is_active_stage:
# Live "\U0001f504 ... \u0438\u0434\u0451\u0442" line. Count how many times THIS stage's
# agent has run for this task; a 2nd+ run means we're re-doing the
# stage (e.g. review->development->review), so show "\u043f\u043e\u043f\u044b\u0442\u043a\u0430 N"
# to make the text change between cycles and to honestly show Slava
# the stage is being re-worked.
attempt = len(agent_runs)
if attempt >= 2:
lines.append(
f"\U0001f504 {label} \u00b7 \u043f\u043e\u043f\u044b\u0442\u043a\u0430 {attempt} "
f"\u2026 \u0438\u0434\u0451\u0442"
)
else:
lines.append(
f"\U0001f504 {label:<13} \u2026 \u00b7 \u0438\u0434\u0451\u0442"
)
elif run is not None:
lines.append(_stage_line(label, run))
# else: not started yet -> not shown.
# Insert the BRD review line right after Analysis.
if stage_key == "analysis" and brd_started:
brd_label = f"{_BRD_LABEL:<13}"
if review_seconds is not None:
# ORCH-042 (BR-10): approve-gate passed -> \u2705 (was \u23f8\ufe0f). The
# still-waiting branch below keeps \u23f8\ufe0f + \u23f3 unchanged.
dur = _fmt_minutes(review_seconds)
lines.append(
f"\u2705 {brd_label} {dur} \u00b7 \u0442\u0432\u043e\u0451 \u0432\u0440\u0435\u043c\u044f"
)
else:
# Still waiting on the human (ended not stamped yet).
from datetime import datetime, timezone
start_dt = _parse_sql_ts(brd_started)
waited = None
if start_dt is not None:
waited = int(
(datetime.now(timezone.utc) - start_dt).total_seconds()
)
dur = _fmt_minutes(waited) if waited is not None else "\u2026"
lines.append(
f"\u23f8\ufe0f {brd_label} {dur} \u00b7 \u0442\u0432\u043e\u0451 \u0432\u0440\u0435\u043c\u044f \u23f3"
)
lines.append(bar)
lines.append(
f"\U0001f4b0 {fmt_tokens(total_in)}\u2193 / {fmt_tokens(total_out)}\u2191 \u00b7 "
f"{fmt_cost(total_cost)}"
)
if done:
wall = _duration_seconds(task["created_at"], task["updated_at"])
wall_str = _fmt_minutes(wall) if wall is not None else "?"
review_str = _fmt_minutes(review_seconds) if review_seconds else ""
lines.append(
f"\u23f1\ufe0f \u0412\u0441\u0435\u0433\u043e {wall_str} \u00b7 "
f"\u0430\u0433\u0435\u043d\u0442\u044b {_fmt_minutes(agent_seconds)} \u00b7 "
f"\u0442\u0432\u043e\u0451 {review_str}"
)
link = _done_link(task_id, task["work_item_id"])
if link:
lines.append(link)
return "\n".join(lines)
def _done_link(task_id: int, work_item_id) -> str | None:
"""Build the final '🔗 PR #n · 📦 Внедрено' line. Never raises -> None."""
try:
from .config import settings
from .db import get_db
conn = get_db()
row = conn.execute(
"SELECT repo, branch FROM tasks WHERE id=?", (task_id,)
).fetchone()
conn.close()
if not row:
return None
repo, branch = row["repo"], row["branch"]
pr_part = None
try:
owner = settings.gitea_owner
headers = {"Authorization": f"token {settings.gitea_token}"}
resp = httpx.get(
f"{settings.gitea_url}/api/v1/repos/{owner}/{repo}/pulls",
params={"state": "all", "head": branch},
headers=headers, timeout=5,
)
if resp.status_code == 200:
prs = resp.json()
if prs:
pr_part = f"\U0001f517 PR #{prs[0].get('number')}"
except Exception:
pr_part = None
parts = []
if pr_part:
parts.append(pr_part)
parts.append("\U0001f4e6 Внедрено") # ORCH-042 (BR-12): was "deployed"
return " \u00b7 ".join(parts)
except Exception:
return None
def update_task_tracker(task_id: int):
"""Render + push the live tracker for a task. Never raises.
Two modes, selected by Settings.tracker_mode (env ORCH_TRACKER_MODE),
resolved case-insensitively here; anything other than "bump" -> "edit"
(ORCH-042). Both keep the "one card per task" invariant.
edit (DEFAULT):
First call (no stored tracker_message_id): sendMessage (silent) and store
the returned message_id. Subsequent calls: editMessageText the stored
message. A NEW message is sent ONLY when the original is truly gone
(deleted / too old / invalid id). On "not modified" (text unchanged) or
transient failures (network / timeout / 5xx / unknown 400) we do NOT send
a new message — that is exactly what produced duplicate trackers and
orphaned (lagging) messages.
bump (ORCH-042):
The card is re-created at the BOTTOM of the chat on every update:
best-effort delete_telegram(old_id) (its result NEVER blocks the send),
then sendMessage (silent), then re-point tracker_message_id to the new id
— but ONLY on a successful send (new_mid is not None), so a transient send
failure never wipes the pointer to None. At most ONE new message is sent
per call -> no duplicates within a call.
The tracker is always sent with disable_notification so it never pings —
only the dedicated alert helpers ping.
"""
try:
from .db import get_tracker_message_id, set_tracker_message_id
text = render_task_tracker(task_id)
mode = (_get_settings().tracker_mode or "edit").strip().lower()
mid = get_tracker_message_id(task_id)
if mode == "bump":
# bump: one card, always at the bottom (delete + send + repoint).
if mid is not None:
# best-effort; result does NOT gate the send (BR-6).
delete_telegram(mid)
new_mid = send_telegram(text, disable_notification=True)
if new_mid is not None:
set_tracker_message_id(task_id, new_mid)
# send returned None (no creds / transient) -> leave mid untouched;
# no duplicate within this call, redraws on the next transition.
return
# mode == "edit" (DEFAULT): existing behaviour, unchanged.
if mid is not None:
result = edit_telegram(mid, text)
if result in (EDIT_OK, EDIT_NOT_MODIFIED):
# Edited in place (or nothing to change) -> done, no duplicate.
return
if result == EDIT_FAILED:
# Transient -> don't duplicate; tracker redraws next transition.
logger.debug(
f"update_task_tracker({task_id}): edit failed transiently, "
f"keeping message {mid}"
)
return
# result == EDIT_GONE -> the stored message is gone; fall through
# to send a fresh one and re-point tracker_message_id at it.
new_mid = send_telegram(text, disable_notification=True)
if new_mid is not None:
set_tracker_message_id(task_id, new_mid)
except Exception as e:
logger.warning(f"update_task_tracker({task_id}) failed: {e}")
# --------------------------------------------------------------------------- #
# Stage / agent lifecycle notifications (now tracker-only, no separate message)
# --------------------------------------------------------------------------- #
def notify_stage_change(task_id: int, old_stage: str, new_stage: str, agent: str = None):
"""Log a stage transition and refresh the live tracker (no separate message)."""
work_item_id = _get_work_item_id(task_id)
msg = f"\U0001f504 {work_item_id}: {old_stage} \u2192 {new_stage}"
if agent:
msg += f" (\u0437\u0430\u043f\u0443\u0449\u0435\u043d {agent})"
logger.info(msg)
update_task_tracker(task_id)
def notify_agent_started(run_id: int, agent: str, task_id: int):
"""Log an agent launch and refresh the tracker (no separate message)."""
work_item_id = _get_work_item_id(task_id)
logger.info(f"\U0001f680 {work_item_id}: {agent} \u0437\u0430\u043f\u0443\u0449\u0435\u043d (run_id={run_id})")
if task_id:
update_task_tracker(task_id)
def notify_agent_finished(run_id: int, agent: str, exit_code: int, task_id: int = None, duration_s: int = None):
"""Log agent completion and refresh the tracker (no separate message).
The agent-FAILED alert (exit_code != 0) is still sent separately by the
launcher via send_telegram; this helper itself only logs + refreshes.
"""
work_item_id = _get_work_item_id(task_id) if task_id else "?"
if exit_code == 0:
dur = f" ({duration_s // 60} \u043c\u0438\u043d)" if duration_s else ""
msg = f"\u2705 {work_item_id}: {agent} \u0437\u0430\u0432\u0435\u0440\u0448\u0438\u043b{dur}"
elif exit_code == -9:
msg = f"\u23f0 {work_item_id}: {agent} \u0443\u0431\u0438\u0442 \u043f\u043e \u0442\u0430\u0439\u043c\u0430\u0443\u0442\u0443 (30 \u043c\u0438\u043d)"
else:
msg = f"\u274c {work_item_id}: {agent} \u0443\u043f\u0430\u043b (exit_code={exit_code})"
logger.info(msg)
if task_id:
update_task_tracker(task_id)
def notify_qg_result(task_id: int, check: str, passed: bool, reason: str = None):
"""Log a QG check result (NO separate Telegram message: QG-pending is noise).
Kept for callers; QG outcomes are log-only now and reflected by the tracker
through the resulting stage transition.
"""
work_item_id = _get_work_item_id(task_id)
if passed:
logger.info(f"\u2705 {work_item_id}: QG {check} \u2014 passed")
else:
logger.warning(f"\u26a0\ufe0f {work_item_id}: QG {check} \u2014 failed: {reason}")
def notify_qg_failure(task_id: int, stage: str, check: str, reason: str):
"""Log a QG check failure (log-only).
QG-pending / QG-failed are NOT pinged as separate messages anymore (they are
not actionable for Slava). Real rollbacks/deploy-fails are alerted by their
own dedicated send_telegram calls in the engine/launcher.
"""
work_item_id = _get_work_item_id(task_id)
logger.warning(f"\u26a0\ufe0f {work_item_id}: QG {check} \u2014 failed: {reason}")
# ORCH-017: hosts that are not clickable off the deploy box. A Plane web-base
# resolving to one of these (the plane_api_url loopback default) means "no usable
# browser URL" -> the Plane link is omitted rather than emitted broken (ADR-001 Р-3).
_LOOPBACK_HOSTS = frozenset({"localhost", "127.0.0.1", "0.0.0.0", "::1"})
def _is_loopback_base(url: str) -> bool:
"""True if the URL's host is a loopback/local address (not clickable off-host).
Empty/garbage URLs count as loopback (i.e. unusable) so callers omit the link.
"""
if not url:
return True
try:
from urllib.parse import urlparse
host = (urlparse(url).hostname or "").lower()
return (not host) or host in _LOOPBACK_HOSTS
except Exception:
return True
def _get_task_link_fields(task_id: int):
"""ORCH-017: read (repo, branch, plane_issue_id) for a task. Never raises.
Returns (None, None, None) on any error / missing row so link building can
degrade gracefully (AC-6).
"""
try:
from .db import get_db
conn = get_db()
row = conn.execute(
"SELECT repo, branch, plane_issue_id FROM tasks WHERE id=?", (task_id,)
).fetchone()
conn.close()
if not row:
return None, None, None
return row["repo"], row["branch"], row["plane_issue_id"]
except Exception as e:
logger.warning(f"_get_task_link_fields({task_id}) failed: {e}")
return None, None, None
def _build_brd_link(repo, branch, work_item_id) -> str | None:
"""ORCH-017: '<a>' to 01-brd.md in Gitea branch-view, or None if data missing.
Mirrors the canonical branch-view pattern in src/usage.py: base =
gitea_public_url or gitea_url, owner = gitea_owner (AC-1/AC-3). The href is
html.escaped as defence-in-depth even though parts come from trusted
config/DB (AC-7).
"""
s = _get_settings()
base = (
getattr(s, "gitea_public_url", "") or getattr(s, "gitea_url", "")
).rstrip("/")
owner = getattr(s, "gitea_owner", "")
if not (base and owner and repo and branch and work_item_id):
return None
url = (
f"{base}/{owner}/{repo}/src/branch/{branch}"
f"/docs/work-items/{work_item_id}/01-brd.md"
)
return (
f'<a href="{html.escape(url, quote=True)}">'
f"\U0001f4c4 Открыть BRD</a>"
)
def _plane_issue_url(repo, plane_issue_id, project_id=None) -> str | None:
"""ORCH-067 (Р-5): build the Plane issue browser URL, or None if unbuildable.
Single source of the URL + guards, shared by ``plane_issue_link`` (link text =
issue number) and ``_build_plane_issue_link`` (link text = '✅ Задача в Plane'),
so the project resolution and loopback-guard live in ONE place (ORCH-017 Р-2).
Full path: ``{web_base}/{workspace}/projects/{project_id}/issues/{issue_id}/``.
web_base = plane_web_url or plane_api_url; a loopback base counts as "no web
URL" -> None. ``project_id`` is taken explicitly when given, else resolved from
``repo``. Never raises.
"""
try:
s = _get_settings()
web_base = (
getattr(s, "plane_web_url", "") or getattr(s, "plane_api_url", "")
).rstrip("/")
workspace = getattr(s, "plane_workspace_slug", "")
if not (web_base and workspace and plane_issue_id) or _is_loopback_base(web_base):
return None
if not project_id:
try:
from .projects import get_project_by_repo
project = get_project_by_repo(repo) if repo else None
except Exception:
project = None
project_id = getattr(project, "plane_project_id", "") if project else ""
if not project_id:
return None
return (
f"{web_base}/{workspace}/projects/{project_id}/issues/{plane_issue_id}/"
)
except Exception:
return None
def _build_plane_issue_link(repo, plane_issue_id) -> str | None:
"""ORCH-017: '<a>' to the Plane issue browser page, or None if unusable.
Link text = '✅ Задача в Plane'. URL built by the shared ``_plane_issue_url``
(loopback / workspace / project guards, ADR-001 Р-2 / ORCH-067 Р-5).
"""
url = _plane_issue_url(repo, plane_issue_id)
if not url:
return None
return (
f'<a href="{html.escape(url, quote=True)}">'
f"✅ Задача в Plane</a>"
)
def plane_issue_link(work_item_id, plane_issue_id=None, project_id=None, repo=None) -> str:
"""ORCH-067 (Р-5): clickable issue number for cards / alerts.
Returns ``<a href=...>ORCH-NNN</a>`` when a Plane web URL can be built, else
``html.escape(work_item_id)`` (number without a link). Never raises.
Link text is always ``html.escape(work_item_id)``; the href is built by the
shared ``_plane_issue_url`` (same loopback / workspace / project guards as the
'✅ Задача в Plane' link). On any missing piece -> the escaped number.
"""
label = html.escape(str(work_item_id)) if work_item_id is not None else ""
try:
url = _plane_issue_url(repo, plane_issue_id, project_id)
if not url:
return label
return f'<a href="{html.escape(url, quote=True)}">{label}</a>'
except Exception:
return label
def link_for(work_item_id, task_id=None) -> str:
"""ORCH-067 (Р-6): clickable issue number for alert points that hold only a
``work_item_id`` (or ``task_id``).
Resolves ``(repo, plane_issue_id)`` from the DB (by ``task_id`` when given,
else the latest task row for ``work_item_id``) and delegates to
``plane_issue_link``. On any missing data -> ``html.escape(work_item_id)``.
Never raises.
"""
if not work_item_id:
return html.escape(str(work_item_id)) if work_item_id is not None else ""
repo = None
plane_issue_id = None
try:
from .db import get_db
conn = get_db()
if task_id is not None:
row = conn.execute(
"SELECT repo, plane_issue_id FROM tasks WHERE id=?", (task_id,)
).fetchone()
else:
row = conn.execute(
"SELECT repo, plane_issue_id FROM tasks WHERE work_item_id=? "
"ORDER BY id DESC LIMIT 1",
(work_item_id,),
).fetchone()
conn.close()
if row:
repo = row["repo"]
plane_issue_id = row["plane_issue_id"]
except Exception as e:
logger.debug(f"link_for({work_item_id}) DB lookup failed: {e}")
return plane_issue_link(work_item_id, plane_issue_id=plane_issue_id, repo=repo)
# --------------------------------------------------------------------------- #
# ORCH-067: Plane status label for the live card (layer B indication, ADR Р-1)
# --------------------------------------------------------------------------- #
# Offline stage -> Plane status label. Names are the final ORCH-066 status names
# (_PLANE_NAME_TO_KEY). Pure / deterministic — derived entirely from tasks.stage
# (+ the brd-clock for In Review), NEVER from the network.
_STAGE_STATUS_LABEL = {
"created": "To Analyse",
"analysis": "Analysis",
"architecture": "Architecture",
"development": "Development",
"review": "Code-Review",
"testing": "Testing",
"deploy": "⏸️ Awaiting Deploy — ожидание Confirm Deploy",
"done": "Done",
}
_DEFAULT_STATUS_LABEL = "To Analyse"
_IN_REVIEW_LABEL = (
"⏸️ In Review — ожидание "
"согласования BRD"
)
# Live-overlay branch labels (keys not derivable offline from tasks.stage).
_LIVE_BRANCH_LABELS = {
"needs_input": "❓ Needs Input — нужны уточнения",
"blocked": "Blocked",
"rejected": "Rejected",
"cancelled": "Cancelled",
"deploying": "Deploying",
"monitoring": "Monitoring after Deploy",
}
# ORCH-066 (Р-1 anti-false-positive): deploying/monitoring alias their BASE key's
# UUID on a project without dedicated statuses (enduro). Override is applied ONLY
# when the project really defined a SEPARATE UUID for the branch key.
_LIVE_BRANCH_BASE = {
"deploying": "in_progress",
"monitoring": "done",
}
def _row_get(row, key, default=None):
"""Safe sqlite3.Row / dict / object getter. Never raises."""
try:
return row[key]
except Exception:
try:
return getattr(row, key, default)
except Exception:
return default
def plane_status_label(task_row) -> str:
"""ORCH-067 (Р-1, layer 1): current Plane status label for the card header.
Pure / deterministic from the task row, NEVER hits the network, NEVER raises.
On unknown / broken input -> a safe stage default. ``⏸️ In Review`` and
``⏸️ Awaiting Deploy`` are produced here (offline), so both work without a
network connection (AC-7, AC-8). Branch statuses that are indistinguishable
offline (Needs Input / Blocked / …) are drawn by ``_live_plane_branch_override``.
"""
try:
stage = _row_get(task_row, "stage") or "created"
except Exception:
return _DEFAULT_STATUS_LABEL
try:
if stage == "analysis":
started = _row_get(task_row, "brd_review_started_at")
ended = _row_get(task_row, "brd_review_ended_at")
if started and not ended:
return _IN_REVIEW_LABEL
return _STAGE_STATUS_LABEL.get(stage, _DEFAULT_STATUS_LABEL)
except Exception:
return _DEFAULT_STATUS_LABEL
# ORCH-067 (Р-3): per-issue TTL cache of the live state uuid -> {issue_id: (ts, uuid)}.
_LIVE_STATE_CACHE: dict[str, tuple] = {}
def _live_state_uuid_cached(plane_issue_id, project_id):
"""ORCH-067 (Р-3/Р-4): TTL-cached single live-state read for the render path.
At most one ``fetch_issue_state`` per issue per ``tracker_live_status_ttl_s``
with a SHORT timeout. Never raises -> None on any failure.
"""
try:
import time
s = _get_settings()
ttl = getattr(s, "tracker_live_status_ttl_s", 60)
now = time.monotonic()
hit = _LIVE_STATE_CACHE.get(plane_issue_id)
if hit is not None and (now - hit[0]) <= ttl:
return hit[1]
from .plane_sync import fetch_issue_state
timeout = getattr(s, "tracker_live_status_timeout_s", 3)
uuid = fetch_issue_state(plane_issue_id, project_id, timeout=timeout)
_LIVE_STATE_CACHE[plane_issue_id] = (now, uuid)
return uuid
except Exception as e:
logger.debug(f"_live_state_uuid_cached({plane_issue_id}) failed: {e}")
return None
def _live_plane_branch_override(repo, plane_issue_id, base_label) -> str:
"""ORCH-067 (Р-1 layer 2 / Р-2): best-effort live-status overlay.
Draws the branch statuses that are indistinguishable from ``tasks.stage``
offline (Needs Input / Blocked / Rejected / Cancelled / Deploying / Monitoring
after Deploy) by reading the LIVE Plane status (short timeout, TTL cache). Any
failure / disabled kill-switch / missing data -> ``base_label`` (offline). The
pipeline is NEVER blocked. Never raises.
"""
try:
s = _get_settings()
if not getattr(s, "tracker_live_status", True):
return base_label
if not plane_issue_id:
return base_label
try:
from .projects import get_project_by_repo
project = get_project_by_repo(repo) if repo else None
except Exception:
project = None
project_id = getattr(project, "plane_project_id", "") if project else ""
if not project_id:
return base_label
live_uuid = _live_state_uuid_cached(plane_issue_id, project_id)
if not live_uuid:
return base_label
from .plane_sync import get_project_states
states = get_project_states(project_id)
for key, label in _LIVE_BRANCH_LABELS.items():
uuid = states.get(key)
if not uuid or uuid != live_uuid:
continue
base_key = _LIVE_BRANCH_BASE.get(key)
if base_key and states.get(base_key) == uuid:
# deploying/monitoring just alias their base key on this project
# (enduro / no dedicated status) -> not a real branch, don't override.
continue
return label
return base_label
except Exception as e:
logger.debug(f"_live_plane_branch_override failed: {e}")
return base_label
def _card_status_label(task_row, repo=None, plane_issue_id=None) -> str:
"""ORCH-067: full status label for the card = offline core + live overlay.
Precedence (Р-1): if the offline core resolved ``⏸️ In Review`` (brd-clock,
authoritative) the overlay is NOT consulted; otherwise the overlay may draw a
branch status. Never raises (AC-9).
"""
try:
base = plane_status_label(task_row)
if base == _IN_REVIEW_LABEL:
return base
return _live_plane_branch_override(repo, plane_issue_id, base)
except Exception:
return _DEFAULT_STATUS_LABEL
def notify_approve_requested(task_id: int):
"""ALERT (separate, notifying): BRD/TZ/AC ready -> flip Plane to Approved.
Also starts the BRD-review clock and refreshes the tracker so the
'⏸️ Ревью БРД · твоё время ⏳' line appears.
"""
work_item_id = _get_work_item_id(task_id)
try:
from .db import mark_brd_review_started
mark_brd_review_started(task_id)
except Exception as e:
logger.warning(f"notify_approve_requested: brd clock start failed: {e}")
msg = (
f"\U0001f4cb {link_for(work_item_id, task_id)}: BRD/\u0422\u0417/AC \u0433\u043e\u0442\u043e\u0432\u044b. "
f"\u041f\u0435\u0440\u0435\u0432\u0435\u0434\u0438\u0442\u0435 \u0437\u0430\u0434\u0430\u0447\u0443 \u0432 \u0441\u0442\u0430\u0442\u0443\u0441 Approved "
f"\u0432 Plane \u0434\u043b\u044f \u043f\u0440\u043e\u0434\u043e\u043b\u0436\u0435\u043d\u0438\u044f."
)
# ORCH-017: embed direct links to the BRD doc (Gitea) and the Plane issue so
# the reviewer can open both straight from the ping. Each link is built
# independently and omitted if its data is missing; building is defensive so
# it can NEVER break the alert (AC-1/AC-2/AC-6). Still exactly one notifying
# message (AC-5); the call to action above is always preserved (AC-4).
try:
repo, branch, plane_issue_id = _get_task_link_fields(task_id)
links = [
link for link in (
_build_brd_link(repo, branch, work_item_id),
_build_plane_issue_link(repo, plane_issue_id),
) if link
]
if links:
msg = msg + "\n\n" + "\n".join(links)
except Exception as e:
logger.warning(f"notify_approve_requested({task_id}): link build failed: {e}")
logger.info(msg)
update_task_tracker(task_id)
send_telegram(msg) # separate, notifying
def notify_done(task_id: int):
"""Task completion: refresh the tracker to its final ГОТОВО form (no separate ping)."""
work_item_id = _get_work_item_id(task_id)
logger.info(f"\U0001f389 {work_item_id}: \u0437\u0430\u0434\u0430\u0447\u0430 \u0437\u0430\u0432\u0435\u0440\u0448\u0435\u043d\u0430!")
update_task_tracker(task_id)
def notify_error(task_id: int, error: str):
"""ALERT (separate, notifying): task error.
ORCH-067 (req 4): the issue number is a clickable Plane link (fail-safe ->
raw number) and the error text is html-escaped so it cannot break the <a>
markup under parse_mode=HTML (AC-14).
"""
work_item_id = _get_work_item_id(task_id) if task_id else "system"
num = link_for(work_item_id, task_id) if task_id else html.escape(work_item_id)
msg = f"\U0001f534 {num}: ERROR \u2014 {html.escape(str(error))}"
logger.error(msg)
send_telegram(msg) # separate, notifying