Устраняет «замёрзшие» осиротевшие карточки live-трекера и доделывает строку
стадии/итоговое время.
G1 — зачистка сирот: аддитивный леджер tracker_messages(task_id, message_id,
created_at, deleted_at) + хелперы add/get_open/mark_deleted в src/db.py. bump
теперь удаляет ВСЕ незакрытые mid задачи (а не только скаляр
tasks.tracker_message_id, сохранён как BC-указатель). Новый mid в леджер только
при успешном send (BR-6); transient-delete остаётся для ретрая; «already
gone»/>48ч закрывается. Корень бага — скалярный учёт, терявший ссылку при
гонке/delete-fail+send-ok (ADR-001 G0).
G3 — deploy-цикл: ключ confirm_deploy в _LIVE_BRANCH_LABELS (без base-alias).
BR-EFF — эффорт в строке: колонка agent_runs.effort (_ensure_column,
идемпотентно), стамп фактического resolve_agent_effort в launcher._spawn в
момент запуска; рендер `· {model} · {effort}`, пустой → суффикс опускается.
BR-G5 — честное время: done-строка `⏱️ Агенты Σ · твоё {review~cap} · общее с
ожиданием {wall}` — три независимых подписанных метрики; кап
tracker_brd_review_cap_s (ORCH_TRACKER_BRD_REVIEW_CAP_S, дефолт 2ч, маркер ~).
Инварианты: STAGE_TRANSITIONS/QG_CHECKS/стадии без изменений; миграции
аддитивны/идемпотентны (enduro не трогается); never-raise,
disable_notification, plane_issue_link (ORCH-067), disable_web_page_preview
(ORCH-080) сохранены; src/reconciler.py не эродирован (ORCH-086 на месте).
Тесты: tests/test_notifications_orphans.py (TC-01..05 + never-raise),
tests/test_tracker_effort_time.py (TC-06/11..15 + confirm_deploy),
tests/test_launcher.py::TestEffortStamp (TC-09/10). Доки: CLAUDE.md
(§Нотификации), docs/architecture/README.md (Notifications), CHANGELOG.md.
Refs: ORCH-087
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
1160 lines
48 KiB
Python
1160 lines
48 KiB
Python
"""Notifications and logging for orchestrator events.
|
||
|
||
feat/telegram-live-tracker (Variant B+): instead of ~15 separate Telegram
|
||
messages per task (agent start / finish / stage transition / QG-pending / tech
|
||
noise), the orchestrator now maintains ONE live tracker message per task that is
|
||
edited in place (editMessageText) on every stage transition. Only events that
|
||
NEED Slava's attention are sent as SEPARATE, notifying messages:
|
||
|
||
* approve-gate (notify_approve_requested) — BRD/TZ/AC ready, flip to Approved
|
||
* deploy failed / rolled back — send_telegram from launcher/engine
|
||
* agent failed (exit_code != 0) — send_telegram from launcher
|
||
* task error (notify_error)
|
||
|
||
The tracker itself is edited SILENTLY (disable_notification: true). Stage-change,
|
||
agent-start, agent-finish and QG-pending no longer emit their own messages — they
|
||
just refresh the tracker (or are log-only).
|
||
"""
|
||
|
||
import html
|
||
import logging
|
||
|
||
import httpx
|
||
|
||
logger = logging.getLogger("orchestrator")
|
||
|
||
# Lazy import to avoid circular imports at module level
|
||
_settings = None
|
||
|
||
|
||
def _get_settings():
|
||
global _settings
|
||
if _settings is None:
|
||
from .config import settings
|
||
_settings = settings
|
||
return _settings
|
||
|
||
|
||
# --------------------------------------------------------------------------- #
|
||
# Low-level Telegram primitives
|
||
# --------------------------------------------------------------------------- #
|
||
|
||
def send_telegram(text: str, disable_notification: bool = False):
|
||
"""Send a notification to Telegram. Fire-and-forget, never raises.
|
||
|
||
Returns the Telegram message_id on success, else None (so callers that want
|
||
to track the message — the tracker — can store it; legacy callers ignore it).
|
||
"""
|
||
s = _get_settings()
|
||
if not s.telegram_bot_token or not s.telegram_chat_id:
|
||
return None
|
||
try:
|
||
url = f"https://api.telegram.org/bot{s.telegram_bot_token}/sendMessage"
|
||
resp = httpx.post(
|
||
url,
|
||
json={
|
||
"chat_id": s.telegram_chat_id,
|
||
"text": text,
|
||
"parse_mode": "HTML",
|
||
"disable_notification": disable_notification,
|
||
# ORCH-080: suppress the Plane link-preview banner that Telegram
|
||
# would otherwise expand under every tracker card / notification.
|
||
"disable_web_page_preview": True,
|
||
},
|
||
timeout=5,
|
||
)
|
||
data = resp.json()
|
||
if data.get("ok"):
|
||
return data["result"]["message_id"]
|
||
except Exception:
|
||
pass # Never crash orchestrator due to notification failure
|
||
return None
|
||
|
||
|
||
# Telegram error descriptions that mean a deleteMessage target is already gone /
|
||
# can't be deleted (>48h, already deleted, invalid id). Treated as "no longer our
|
||
# problem" -> the caller proceeds to send a fresh card. NOT a transient failure.
|
||
_DELETE_GONE_MARKERS = (
|
||
"message to delete not found",
|
||
"message can't be deleted",
|
||
"message_id_invalid",
|
||
)
|
||
|
||
|
||
def delete_telegram(message_id: int) -> bool:
|
||
"""Delete a Telegram message. Never raises.
|
||
|
||
Returns True if the message is gone after the call (deleted now, OR Telegram
|
||
says it's already not there / can't be deleted -> treat as "no longer our
|
||
problem", caller proceeds to send a fresh card). Returns False only on a
|
||
transient failure (network / timeout / 5xx / unknown error) where the old
|
||
message may still be alive.
|
||
"""
|
||
s = _get_settings()
|
||
if not s.telegram_bot_token or not s.telegram_chat_id:
|
||
# No creds -> nothing was deleted; mirror the other helpers' no-op path.
|
||
return False
|
||
try:
|
||
url = f"https://api.telegram.org/bot{s.telegram_bot_token}/deleteMessage"
|
||
resp = httpx.post(
|
||
url,
|
||
json={
|
||
"chat_id": s.telegram_chat_id,
|
||
"message_id": message_id,
|
||
},
|
||
timeout=5,
|
||
)
|
||
data = resp.json()
|
||
if data.get("ok"):
|
||
return True
|
||
# ok:false -> classify. "Already gone / can't delete" is an expected,
|
||
# non-transient outcome (>48h, already deleted) -> the old message is no
|
||
# longer there, caller should still send a fresh card.
|
||
desc = str(data.get("description") or "").lower()
|
||
if any(m in desc for m in _DELETE_GONE_MARKERS):
|
||
logger.debug(
|
||
f"delete_telegram(mid={message_id}): already gone ({desc!r})"
|
||
)
|
||
return True
|
||
# Unknown 400 / 5xx -> transient; the old message may still be alive.
|
||
logger.warning(
|
||
f"delete_telegram(mid={message_id}): delete failed ({desc!r})"
|
||
)
|
||
return False
|
||
except Exception as e:
|
||
# Network / timeout -> transient; old message may still be alive.
|
||
logger.warning(f"delete_telegram(mid={message_id}): transient error: {e}")
|
||
return False
|
||
|
||
|
||
# edit_telegram outcome codes -> let update_task_tracker decide what to do:
|
||
# "ok" edit applied -> nothing else to do
|
||
# "not_modified" Telegram says text is identical (400 "message is not
|
||
# modified" / "exactly the same") -> success, NO new message
|
||
# "gone" original message can't be edited (deleted / too old /
|
||
# invalid id) -> caller must fall back to a NEW message
|
||
# "failed" transient failure (network / timeout / 5xx / unknown 400)
|
||
# -> caller must NOT send a new message (avoid duplicates)
|
||
EDIT_OK = "ok"
|
||
EDIT_NOT_MODIFIED = "not_modified"
|
||
EDIT_GONE = "gone"
|
||
EDIT_FAILED = "failed"
|
||
|
||
# Telegram error descriptions that mean the message is permanently un-editable
|
||
# (it is gone / orphaned) -> fall back to a fresh message.
|
||
_GONE_MARKERS = (
|
||
"message to edit not found",
|
||
"message can't be edited",
|
||
"message_id_invalid",
|
||
)
|
||
# Telegram "nothing changed" -> treat as success, never a duplicate.
|
||
_NOT_MODIFIED_MARKERS = (
|
||
"message is not modified",
|
||
"exactly the same",
|
||
)
|
||
|
||
|
||
def edit_telegram(message_id: int, text: str) -> str:
|
||
"""Edit an existing Telegram message. Never raises.
|
||
|
||
Returns a distinguishable outcome (see EDIT_* constants) so the caller can
|
||
tell apart "all good" / "nothing changed" / "message gone" / "transient
|
||
failure" and only fall back to a NEW message when the original is truly gone.
|
||
"""
|
||
s = _get_settings()
|
||
if not s.telegram_bot_token or not s.telegram_chat_id:
|
||
return EDIT_FAILED
|
||
try:
|
||
url = f"https://api.telegram.org/bot{s.telegram_bot_token}/editMessageText"
|
||
resp = httpx.post(
|
||
url,
|
||
json={
|
||
"chat_id": s.telegram_chat_id,
|
||
"message_id": message_id,
|
||
"text": text,
|
||
"parse_mode": "HTML",
|
||
# ORCH-080: suppress the Plane link-preview banner (see send_telegram).
|
||
"disable_web_page_preview": True,
|
||
},
|
||
timeout=5,
|
||
)
|
||
data = resp.json()
|
||
if data.get("ok"):
|
||
return EDIT_OK
|
||
# ok:false -> inspect the description to classify the 400.
|
||
desc = str(data.get("description") or "").lower()
|
||
if any(m in desc for m in _NOT_MODIFIED_MARKERS):
|
||
# Text is identical between transitions (e.g. repeat review cycle
|
||
# renders the same line). Nothing to do, NOT a duplicate.
|
||
logger.debug(
|
||
f"edit_telegram(mid={message_id}): not modified, skipping"
|
||
)
|
||
return EDIT_NOT_MODIFIED
|
||
if any(m in desc for m in _GONE_MARKERS):
|
||
logger.warning(
|
||
f"edit_telegram(mid={message_id}): message gone ({desc!r}), "
|
||
f"will fall back to a new message"
|
||
)
|
||
return EDIT_GONE
|
||
# Unknown 400 / other non-ok -> transient/unknown, do NOT duplicate.
|
||
logger.warning(
|
||
f"edit_telegram(mid={message_id}): edit failed ({desc!r})"
|
||
)
|
||
return EDIT_FAILED
|
||
except Exception as e:
|
||
# Network / timeout / 5xx -> transient, do NOT duplicate.
|
||
logger.warning(f"edit_telegram(mid={message_id}): transient error: {e}")
|
||
return EDIT_FAILED
|
||
|
||
|
||
def _get_work_item_id(task_id: int) -> str:
|
||
"""Get work_item_id from DB by task_id."""
|
||
try:
|
||
from .db import get_db
|
||
conn = get_db()
|
||
row = conn.execute("SELECT work_item_id FROM tasks WHERE id=?", (task_id,)).fetchone()
|
||
conn.close()
|
||
return row[0] if row and row[0] else f"task-{task_id}"
|
||
except Exception:
|
||
return f"task-{task_id}"
|
||
|
||
|
||
# --------------------------------------------------------------------------- #
|
||
# Live task tracker
|
||
# --------------------------------------------------------------------------- #
|
||
|
||
# Pipeline stages shown in the tracker, in order, with their display label and
|
||
# the agent whose agent_runs rows describe that stage's work. "Ревью БРД" is NOT
|
||
# an agent stage — it is the human approve gate rendered between Analysis and
|
||
# Architecture from the task's brd_review_* timestamps.
|
||
# ORCH-042 (BR-11): display-labels are Russian. Stage KEYS (analysis, …) and
|
||
# agent names (analyst, …) are NOT touched — they are wired to
|
||
# _STAGE_ACTIVE_AGENT, last_done and the DB. Only the 2nd tuple element changed.
|
||
_TRACKER_STAGES = [
|
||
("analysis", "Анализ", "analyst"), # Анализ
|
||
("architecture", "Архитектура", "architect"), # Архитектура
|
||
("development", "Разработка", "developer"), # Разработка
|
||
("review", "Код ревью", "reviewer"), # Код ревью
|
||
("testing", "Тестирование", "tester"), # Тестирование
|
||
("deploy", "Внедрение", "deployer"), # Внедрение
|
||
]
|
||
|
||
# Map a pipeline stage -> the agent that is RUNNING while the task sits in it.
|
||
# (development is entered after architecture finishes, etc.) Used to render the
|
||
# "🔄 <Stage> … идёт" line for the currently-active stage.
|
||
# ORCH-042 (BR-9): "Подтверждение BRD" (was "Ревью БРД").
|
||
_BRD_LABEL = "Подтверждение BRD"
|
||
|
||
_STAGE_ACTIVE_AGENT = {
|
||
"analysis": "analyst",
|
||
"architecture": "architect",
|
||
"development": "developer",
|
||
"review": "reviewer",
|
||
"testing": "tester",
|
||
"deploy": "deployer",
|
||
}
|
||
|
||
|
||
def _fmt_minutes(seconds) -> str:
|
||
"""Render a duration in whole minutes: 0..59s -> '<1м', else '<n>м'."""
|
||
try:
|
||
seconds = int(seconds or 0)
|
||
except (TypeError, ValueError):
|
||
seconds = 0
|
||
if seconds <= 0:
|
||
return "0м"
|
||
if seconds < 60:
|
||
return "<1м"
|
||
return f"{seconds // 60}\u043c"
|
||
|
||
|
||
def _parse_sql_ts(ts):
|
||
"""Parse a SQLite 'YYYY-MM-DD HH:MM:SS' UTC timestamp -> aware datetime/None."""
|
||
if not ts:
|
||
return None
|
||
from datetime import datetime, timezone
|
||
for fmt in ("%Y-%m-%d %H:%M:%S", "%Y-%m-%dT%H:%M:%S"):
|
||
try:
|
||
return datetime.strptime(str(ts)[:19], fmt).replace(tzinfo=timezone.utc)
|
||
except (ValueError, TypeError):
|
||
continue
|
||
return None
|
||
|
||
|
||
def _duration_seconds(started, finished):
|
||
"""Seconds between two SQL timestamps; None if either is missing/unparseable."""
|
||
a = _parse_sql_ts(started)
|
||
b = _parse_sql_ts(finished)
|
||
if a is None or b is None:
|
||
return None
|
||
return max(int((b - a).total_seconds()), 0)
|
||
|
||
|
||
def _capped_review_str(review_seconds) -> str:
|
||
"""ORCH-087 (BR-G5): human BRD-review duration, capped to drop anomalous stalls.
|
||
|
||
Returns '0м' when there was no review window. When the review exceeds
|
||
``tracker_brd_review_cap_s`` (default 2h; <=0 disables the cap) the capped value
|
||
is shown with a leading '~' to signal the real value was longer — an open
|
||
brd_review clock from a desync (In Review -> Backlog) rather than genuine human
|
||
time (ORCH-087: 392m). Never raises.
|
||
"""
|
||
try:
|
||
if not review_seconds:
|
||
return "0м"
|
||
secs = int(review_seconds)
|
||
try:
|
||
cap = int(getattr(_get_settings(), "tracker_brd_review_cap_s", 0) or 0)
|
||
except Exception:
|
||
cap = 0
|
||
if cap > 0 and secs > cap:
|
||
return f"~{_fmt_minutes(cap)}"
|
||
return _fmt_minutes(secs)
|
||
except Exception:
|
||
return _fmt_minutes(review_seconds) if review_seconds else "0м"
|
||
|
||
|
||
def _run_effort(run) -> str:
|
||
"""ORCH-087 (BR-EFF): the effort tag for a stage line. Never raises -> ''.
|
||
|
||
Returns the stamped agent_runs.effort (the REAL --effort sent at launch). NULL
|
||
/ empty (historical row predating the column, or a launch with no --effort
|
||
flag) -> '' so the caller omits the effort suffix (the documented default,
|
||
AC-E.4). New runs are stamped in launcher._spawn, so going forward every stage
|
||
line carries its resolved effort (developer xhigh, tester/deployer medium, …).
|
||
"""
|
||
try:
|
||
effort = _row_get(run, "effort")
|
||
return str(effort) if effort else ""
|
||
except Exception:
|
||
return ""
|
||
|
||
|
||
def render_task_tracker(task_id: int) -> str:
|
||
"""Build the full live-tracker text for a task from the DB (stateless render).
|
||
|
||
Pulls the task header (work_item_id, title, stage), every agent_runs row, and
|
||
the BRD-review timestamps, then renders:
|
||
- one '✅ <Stage> <dur> · <in>↓/<out>↑ · <cost> · <model>' line per finished
|
||
stage (latest run per stage),
|
||
- the '✅/⏸️ Подтверждение BRD <dur> · твоё время[ ⏳]' line between
|
||
Analysis/Architecture (✅ once the approve-gate passed, ⏸️+⏳ while waiting),
|
||
- a '🔄 <Stage> … идёт' line for the active (in-progress) stage,
|
||
- the '💰 <in>↓ / <out>↑ · <cost>' totals,
|
||
- on done: '⏱️ Всего .. · агенты .. · твоё ..' and a '🔗 PR / 📦' line.
|
||
|
||
Never raises (returns a minimal fallback string on error).
|
||
"""
|
||
from .db import get_db
|
||
from .usage import fmt_tokens, fmt_cost, _input_total, short_model_name
|
||
|
||
try:
|
||
conn = get_db()
|
||
task = conn.execute(
|
||
"SELECT id, work_item_id, title, stage, created_at, updated_at, "
|
||
"brd_review_started_at, brd_review_ended_at, repo, plane_issue_id "
|
||
"FROM tasks WHERE id=?",
|
||
(task_id,),
|
||
).fetchone()
|
||
if not task:
|
||
conn.close()
|
||
return f"task-{task_id}"
|
||
runs = conn.execute(
|
||
"SELECT agent, started_at, finished_at, exit_code, input_tokens, "
|
||
"output_tokens, cache_read_tokens, cache_creation_tokens, cost_usd, "
|
||
"model, effort "
|
||
"FROM agent_runs WHERE task_id=? ORDER BY id ASC",
|
||
(task_id,),
|
||
).fetchall()
|
||
conn.close()
|
||
except Exception as e:
|
||
logger.warning(f"render_task_tracker({task_id}) DB error: {e}")
|
||
return f"task-{task_id}"
|
||
|
||
work_item_id = task["work_item_id"] or f"task-{task_id}"
|
||
title = task["title"] or work_item_id
|
||
stage = task["stage"] or "created"
|
||
done = stage == "done"
|
||
|
||
# Latest completed run per agent (a stage may have multiple runs on retry;
|
||
# we show the most recent FINISHED, successful run for the stage line).
|
||
last_done = {}
|
||
agent_runs_by_agent = {}
|
||
for r in runs:
|
||
agent_runs_by_agent.setdefault(r["agent"], []).append(r)
|
||
if r["finished_at"] and (r["exit_code"] == 0 or r["exit_code"] is None):
|
||
last_done[r["agent"]] = r
|
||
|
||
# Totals across ALL runs (every input/output token + cost counts).
|
||
total_in = 0
|
||
total_out = 0
|
||
total_cost = 0.0
|
||
agent_seconds = 0
|
||
for r in runs:
|
||
usage = {
|
||
"input_tokens": r["input_tokens"],
|
||
"cache_read_tokens": r["cache_read_tokens"],
|
||
"cache_creation_tokens": r["cache_creation_tokens"],
|
||
}
|
||
total_in += _input_total(usage)
|
||
total_out += int(r["output_tokens"] or 0)
|
||
total_cost += float(r["cost_usd"] or 0.0)
|
||
d = _duration_seconds(r["started_at"], r["finished_at"])
|
||
if d is not None:
|
||
agent_seconds += d
|
||
|
||
esc_title = html.escape(title)
|
||
# ORCH-067 (req 3): the issue number in the header is now a clickable link to
|
||
# the Plane issue (degrades to the escaped number when no web URL \u2014 fail-safe).
|
||
task_repo = _row_get(task, "repo")
|
||
task_issue_id = _row_get(task, "plane_issue_id")
|
||
num_html = plane_issue_link(work_item_id, plane_issue_id=task_issue_id, repo=task_repo)
|
||
header = (
|
||
f"\U0001f389 {num_html} \u00b7 {esc_title} \u2014 \u0413\u041e\u0422\u041e\u0412\u041e"
|
||
if done
|
||
else f"\U0001f6e0\ufe0f {num_html} \u00b7 {esc_title}"
|
||
)
|
||
bar = "\u2501" * 22
|
||
# ORCH-067 (req 2): a Plane-status line (model ORCH-066) under the header.
|
||
# Built fail-safe: any error degrades to a stage default, never breaks render.
|
||
try:
|
||
status_label = _card_status_label(
|
||
task, repo=task_repo, plane_issue_id=task_issue_id
|
||
)
|
||
except Exception:
|
||
status_label = _DEFAULT_STATUS_LABEL
|
||
status_line = f"\U0001f4cd {status_label}"
|
||
lines = [header, status_line, bar]
|
||
|
||
# ORCH-026 (B-4): waiting-line for a task blocked by an unfinished declared
|
||
# dependency. Shows WHAT the task is waiting on ("⏳ ждёт ORCH-NNN"),
|
||
# so the single tracker card (invariant preserved) makes the wait visible.
|
||
# Never breaks the render: any error -> no waiting-line.
|
||
if not done:
|
||
try:
|
||
from . import task_deps
|
||
from .config import settings as _settings
|
||
if getattr(_settings, "task_deps_enabled", False):
|
||
ready, waiting_on = task_deps.is_task_ready(task_id)
|
||
if not ready and waiting_on:
|
||
waits = ", ".join(link_for(w) for w in waiting_on)
|
||
lines.append(f"⏳ ждёт {waits}")
|
||
except Exception:
|
||
pass
|
||
|
||
def _stage_line(label, run):
|
||
usage = {
|
||
"input_tokens": run["input_tokens"],
|
||
"cache_read_tokens": run["cache_read_tokens"],
|
||
"cache_creation_tokens": run["cache_creation_tokens"],
|
||
}
|
||
in_tok = fmt_tokens(_input_total(usage))
|
||
out_tok = fmt_tokens(run["output_tokens"])
|
||
cost = fmt_cost(run["cost_usd"])
|
||
dur = _fmt_minutes(_duration_seconds(run["started_at"], run["finished_at"]))
|
||
model = short_model_name(run["model"])
|
||
model_suffix = f" \u00b7 {model}" if model else ""
|
||
# ORCH-087 (BR-EFF): render the resolved --effort next to the model
|
||
# ("\u00b7 opus-4-8 \u00b7 xhigh"). Stamped at launch in agent_runs.effort; empty /
|
||
# missing -> suffix omitted (like the model suffix). Historical rows with
|
||
# NULL effort fall back to the config-resolved effort for the agent.
|
||
effort = _run_effort(run)
|
||
effort_suffix = f" \u00b7 {effort}" if effort else ""
|
||
return (
|
||
f"\u2705 {label:<13} {dur} \u00b7 "
|
||
f"{in_tok}\u2193/{out_tok}\u2191 \u00b7 {cost}{model_suffix}{effort_suffix}"
|
||
)
|
||
|
||
# BRD review line: between Analysis and Architecture, only once Analysis has
|
||
# produced a run (i.e. the gate is live). Time = human review delta.
|
||
brd_started = task["brd_review_started_at"]
|
||
brd_ended = task["brd_review_ended_at"]
|
||
review_seconds = _duration_seconds(brd_started, brd_ended)
|
||
|
||
for stage_key, label, agent in _TRACKER_STAGES:
|
||
run = last_done.get(agent)
|
||
# The stage is "in progress" only when it is the task's current stage AND
|
||
# there is an unfinished run for its agent (the agent is actually still
|
||
# working). A finished run with no in-flight run -> show the \u2705 result,
|
||
# even if the task still sits in that stage (just-finished snapshot).
|
||
agent_runs = agent_runs_by_agent.get(agent, [])
|
||
has_inflight = any(ar["finished_at"] is None for ar in agent_runs)
|
||
is_active_stage = (
|
||
_STAGE_ACTIVE_AGENT.get(stage) == agent
|
||
and stage == stage_key
|
||
and (has_inflight or run is None)
|
||
)
|
||
if is_active_stage:
|
||
# Live "\U0001f504 ... \u0438\u0434\u0451\u0442" line. Count how many times THIS stage's
|
||
# agent has run for this task; a 2nd+ run means we're re-doing the
|
||
# stage (e.g. review->development->review), so show "\u043f\u043e\u043f\u044b\u0442\u043a\u0430 N"
|
||
# to make the text change between cycles and to honestly show Slava
|
||
# the stage is being re-worked.
|
||
attempt = len(agent_runs)
|
||
if attempt >= 2:
|
||
lines.append(
|
||
f"\U0001f504 {label} \u00b7 \u043f\u043e\u043f\u044b\u0442\u043a\u0430 {attempt} "
|
||
f"\u2026 \u0438\u0434\u0451\u0442"
|
||
)
|
||
else:
|
||
lines.append(
|
||
f"\U0001f504 {label:<13} \u2026 \u00b7 \u0438\u0434\u0451\u0442"
|
||
)
|
||
elif run is not None:
|
||
lines.append(_stage_line(label, run))
|
||
# else: not started yet -> not shown.
|
||
|
||
# Insert the BRD review line right after Analysis.
|
||
if stage_key == "analysis" and brd_started:
|
||
brd_label = f"{_BRD_LABEL:<13}"
|
||
if review_seconds is not None:
|
||
# ORCH-042 (BR-10): approve-gate passed -> \u2705 (was \u23f8\ufe0f). The
|
||
# still-waiting branch below keeps \u23f8\ufe0f + \u23f3 unchanged.
|
||
dur = _fmt_minutes(review_seconds)
|
||
lines.append(
|
||
f"\u2705 {brd_label} {dur} \u00b7 \u0442\u0432\u043e\u0451 \u0432\u0440\u0435\u043c\u044f"
|
||
)
|
||
else:
|
||
# Still waiting on the human (ended not stamped yet).
|
||
from datetime import datetime, timezone
|
||
start_dt = _parse_sql_ts(brd_started)
|
||
waited = None
|
||
if start_dt is not None:
|
||
waited = int(
|
||
(datetime.now(timezone.utc) - start_dt).total_seconds()
|
||
)
|
||
dur = _fmt_minutes(waited) if waited is not None else "\u2026"
|
||
lines.append(
|
||
f"\u23f8\ufe0f {brd_label} {dur} \u00b7 \u0442\u0432\u043e\u0451 \u0432\u0440\u0435\u043c\u044f \u23f3"
|
||
)
|
||
|
||
lines.append(bar)
|
||
lines.append(
|
||
f"\U0001f4b0 {fmt_tokens(total_in)}\u2193 / {fmt_tokens(total_out)}\u2191 \u00b7 "
|
||
f"{fmt_cost(total_cost)}"
|
||
)
|
||
|
||
if done:
|
||
wall = _duration_seconds(task["created_at"], task["updated_at"])
|
||
wall_str = _fmt_minutes(wall) if wall is not None else "?"
|
||
review_str = _capped_review_str(review_seconds)
|
||
# ORCH-087 (BR-G5): three INDEPENDENT, explicitly-labelled metrics. None is
|
||
# presented as the sum of the others \u2014 queue/wait pauses are not logged, so
|
||
# wall != agents + review; the old "\u0412\u0441\u0435\u0433\u043e {wall}" read like a (wrong) sum.
|
||
# \u0410\u0433\u0435\u043d\u0442\u044b = sum(agent_runs) (precise main metric, T-1)
|
||
# \u0442\u0432\u043e\u0451 = human BRD-review, capped to drop anomalous stalls (T-2)
|
||
# \u043e\u0431\u0449\u0435\u0435 \u0441 \u043e\u0436\u0438\u0434\u0430\u043d\u0438\u0435\u043c = wall-clock incl. queue/wait, NOT work time (T-3)
|
||
lines.append(
|
||
f"\u23f1\ufe0f \u0410\u0433\u0435\u043d\u0442\u044b {_fmt_minutes(agent_seconds)} \u00b7 "
|
||
f"\u0442\u0432\u043e\u0451 {review_str} \u00b7 "
|
||
f"\u043e\u0431\u0449\u0435\u0435 \u0441 \u043e\u0436\u0438\u0434\u0430\u043d\u0438\u0435\u043c {wall_str}"
|
||
)
|
||
link = _done_link(task_id, task["work_item_id"])
|
||
if link:
|
||
lines.append(link)
|
||
|
||
return "\n".join(lines)
|
||
|
||
|
||
def _done_link(task_id: int, work_item_id) -> str | None:
|
||
"""Build the final '🔗 PR #n · 📦 Внедрено' line. Never raises -> None."""
|
||
try:
|
||
from .config import settings
|
||
from .db import get_db
|
||
conn = get_db()
|
||
row = conn.execute(
|
||
"SELECT repo, branch FROM tasks WHERE id=?", (task_id,)
|
||
).fetchone()
|
||
conn.close()
|
||
if not row:
|
||
return None
|
||
repo, branch = row["repo"], row["branch"]
|
||
pr_part = None
|
||
try:
|
||
owner = settings.gitea_owner
|
||
headers = {"Authorization": f"token {settings.gitea_token}"}
|
||
resp = httpx.get(
|
||
f"{settings.gitea_url}/api/v1/repos/{owner}/{repo}/pulls",
|
||
params={"state": "all", "head": branch},
|
||
headers=headers, timeout=5,
|
||
)
|
||
if resp.status_code == 200:
|
||
prs = resp.json()
|
||
if prs:
|
||
pr_part = f"\U0001f517 PR #{prs[0].get('number')}"
|
||
except Exception:
|
||
pr_part = None
|
||
parts = []
|
||
if pr_part:
|
||
parts.append(pr_part)
|
||
parts.append("\U0001f4e6 Внедрено") # ORCH-042 (BR-12): was "deployed"
|
||
return " \u00b7 ".join(parts)
|
||
except Exception:
|
||
return None
|
||
|
||
|
||
def update_task_tracker(task_id: int):
|
||
"""Render + push the live tracker for a task. Never raises.
|
||
|
||
Two modes, selected by Settings.tracker_mode (env ORCH_TRACKER_MODE),
|
||
resolved case-insensitively here; anything other than "bump" -> "edit"
|
||
(ORCH-042). Both keep the "one card per task" invariant.
|
||
|
||
edit (DEFAULT):
|
||
First call (no stored tracker_message_id): sendMessage (silent) and store
|
||
the returned message_id. Subsequent calls: editMessageText the stored
|
||
message. A NEW message is sent ONLY when the original is truly gone
|
||
(deleted / too old / invalid id). On "not modified" (text unchanged) or
|
||
transient failures (network / timeout / 5xx / unknown 400) we do NOT send
|
||
a new message — that is exactly what produced duplicate trackers and
|
||
orphaned (lagging) messages.
|
||
|
||
bump (ORCH-042):
|
||
The card is re-created at the BOTTOM of the chat on every update:
|
||
best-effort delete_telegram(old_id) (its result NEVER blocks the send),
|
||
then sendMessage (silent), then re-point tracker_message_id to the new id
|
||
— but ONLY on a successful send (new_mid is not None), so a transient send
|
||
failure never wipes the pointer to None. At most ONE new message is sent
|
||
per call -> no duplicates within a call.
|
||
|
||
The tracker is always sent with disable_notification so it never pings —
|
||
only the dedicated alert helpers ping.
|
||
"""
|
||
try:
|
||
from .db import (
|
||
get_tracker_message_id, set_tracker_message_id,
|
||
get_open_tracker_messages, add_tracker_message,
|
||
mark_tracker_message_deleted,
|
||
)
|
||
text = render_task_tracker(task_id)
|
||
mode = (_get_settings().tracker_mode or "edit").strip().lower()
|
||
mid = get_tracker_message_id(task_id)
|
||
|
||
if mode == "bump":
|
||
# bump: one card, always at the bottom (delete + send + repoint).
|
||
# ORCH-087 (BR-G1): clean up ALL still-open cards of this task, not
|
||
# only the last (scalar) mid. The ledger is the authoritative set of
|
||
# every card ever created; any reference lost by the scalar (race /
|
||
# delete-fail+send-ok / restart) is still tracked here and reaped now.
|
||
open_mids = set()
|
||
try:
|
||
open_mids.update(get_open_tracker_messages(task_id))
|
||
except Exception as e:
|
||
logger.warning(f"update_task_tracker({task_id}): ledger read failed: {e}")
|
||
if mid is not None:
|
||
# Scalar pointer is part of the live set (e.g. a card sent before
|
||
# the ledger existed); union avoids missing it.
|
||
open_mids.add(mid)
|
||
for old_mid in open_mids:
|
||
# best-effort; result does NOT gate the send (BR-6).
|
||
if delete_telegram(old_mid):
|
||
# gone (deleted now OR already gone / >48h) -> drop from ledger.
|
||
try:
|
||
mark_tracker_message_deleted(task_id, old_mid)
|
||
except Exception as e:
|
||
logger.warning(
|
||
f"update_task_tracker({task_id}): mark-deleted failed: {e}"
|
||
)
|
||
# transient False -> leave open in the ledger for a retry next bump.
|
||
new_mid = send_telegram(text, disable_notification=True)
|
||
if new_mid is not None:
|
||
# R-3 / BR-6: only record the new card on a successful send.
|
||
try:
|
||
add_tracker_message(task_id, new_mid)
|
||
except Exception as e:
|
||
logger.warning(
|
||
f"update_task_tracker({task_id}): ledger insert failed: {e}"
|
||
)
|
||
set_tracker_message_id(task_id, new_mid)
|
||
# send returned None (no creds / transient) -> leave mid/ledger
|
||
# untouched; no duplicate within this call, redraws next transition.
|
||
return
|
||
|
||
# mode == "edit" (DEFAULT): existing behaviour, unchanged.
|
||
if mid is not None:
|
||
result = edit_telegram(mid, text)
|
||
if result in (EDIT_OK, EDIT_NOT_MODIFIED):
|
||
# Edited in place (or nothing to change) -> done, no duplicate.
|
||
return
|
||
if result == EDIT_FAILED:
|
||
# Transient -> don't duplicate; tracker redraws next transition.
|
||
logger.debug(
|
||
f"update_task_tracker({task_id}): edit failed transiently, "
|
||
f"keeping message {mid}"
|
||
)
|
||
return
|
||
# result == EDIT_GONE -> the stored message is gone; fall through
|
||
# to send a fresh one and re-point tracker_message_id at it.
|
||
new_mid = send_telegram(text, disable_notification=True)
|
||
if new_mid is not None:
|
||
set_tracker_message_id(task_id, new_mid)
|
||
except Exception as e:
|
||
logger.warning(f"update_task_tracker({task_id}) failed: {e}")
|
||
|
||
|
||
# --------------------------------------------------------------------------- #
|
||
# Stage / agent lifecycle notifications (now tracker-only, no separate message)
|
||
# --------------------------------------------------------------------------- #
|
||
|
||
def notify_stage_change(task_id: int, old_stage: str, new_stage: str, agent: str = None):
|
||
"""Log a stage transition and refresh the live tracker (no separate message)."""
|
||
work_item_id = _get_work_item_id(task_id)
|
||
msg = f"\U0001f504 {work_item_id}: {old_stage} \u2192 {new_stage}"
|
||
if agent:
|
||
msg += f" (\u0437\u0430\u043f\u0443\u0449\u0435\u043d {agent})"
|
||
logger.info(msg)
|
||
update_task_tracker(task_id)
|
||
|
||
|
||
def notify_agent_started(run_id: int, agent: str, task_id: int):
|
||
"""Log an agent launch and refresh the tracker (no separate message)."""
|
||
work_item_id = _get_work_item_id(task_id)
|
||
logger.info(f"\U0001f680 {work_item_id}: {agent} \u0437\u0430\u043f\u0443\u0449\u0435\u043d (run_id={run_id})")
|
||
if task_id:
|
||
update_task_tracker(task_id)
|
||
|
||
|
||
def notify_agent_finished(run_id: int, agent: str, exit_code: int, task_id: int = None, duration_s: int = None):
|
||
"""Log agent completion and refresh the tracker (no separate message).
|
||
|
||
The agent-FAILED alert (exit_code != 0) is still sent separately by the
|
||
launcher via send_telegram; this helper itself only logs + refreshes.
|
||
"""
|
||
work_item_id = _get_work_item_id(task_id) if task_id else "?"
|
||
if exit_code == 0:
|
||
dur = f" ({duration_s // 60} \u043c\u0438\u043d)" if duration_s else ""
|
||
msg = f"\u2705 {work_item_id}: {agent} \u0437\u0430\u0432\u0435\u0440\u0448\u0438\u043b{dur}"
|
||
elif exit_code == -9:
|
||
msg = f"\u23f0 {work_item_id}: {agent} \u0443\u0431\u0438\u0442 \u043f\u043e \u0442\u0430\u0439\u043c\u0430\u0443\u0442\u0443 (30 \u043c\u0438\u043d)"
|
||
else:
|
||
msg = f"\u274c {work_item_id}: {agent} \u0443\u043f\u0430\u043b (exit_code={exit_code})"
|
||
logger.info(msg)
|
||
if task_id:
|
||
update_task_tracker(task_id)
|
||
|
||
|
||
def notify_qg_result(task_id: int, check: str, passed: bool, reason: str = None):
|
||
"""Log a QG check result (NO separate Telegram message: QG-pending is noise).
|
||
|
||
Kept for callers; QG outcomes are log-only now and reflected by the tracker
|
||
through the resulting stage transition.
|
||
"""
|
||
work_item_id = _get_work_item_id(task_id)
|
||
if passed:
|
||
logger.info(f"\u2705 {work_item_id}: QG {check} \u2014 passed")
|
||
else:
|
||
logger.warning(f"\u26a0\ufe0f {work_item_id}: QG {check} \u2014 failed: {reason}")
|
||
|
||
|
||
def notify_qg_failure(task_id: int, stage: str, check: str, reason: str):
|
||
"""Log a QG check failure (log-only).
|
||
|
||
QG-pending / QG-failed are NOT pinged as separate messages anymore (they are
|
||
not actionable for Slava). Real rollbacks/deploy-fails are alerted by their
|
||
own dedicated send_telegram calls in the engine/launcher.
|
||
"""
|
||
work_item_id = _get_work_item_id(task_id)
|
||
logger.warning(f"\u26a0\ufe0f {work_item_id}: QG {check} \u2014 failed: {reason}")
|
||
|
||
|
||
# ORCH-017: hosts that are not clickable off the deploy box. A Plane web-base
|
||
# resolving to one of these (the plane_api_url loopback default) means "no usable
|
||
# browser URL" -> the Plane link is omitted rather than emitted broken (ADR-001 Р-3).
|
||
_LOOPBACK_HOSTS = frozenset({"localhost", "127.0.0.1", "0.0.0.0", "::1"})
|
||
|
||
|
||
def _is_loopback_base(url: str) -> bool:
|
||
"""True if the URL's host is a loopback/local address (not clickable off-host).
|
||
|
||
Empty/garbage URLs count as loopback (i.e. unusable) so callers omit the link.
|
||
"""
|
||
if not url:
|
||
return True
|
||
try:
|
||
from urllib.parse import urlparse
|
||
host = (urlparse(url).hostname or "").lower()
|
||
return (not host) or host in _LOOPBACK_HOSTS
|
||
except Exception:
|
||
return True
|
||
|
||
|
||
def _get_task_link_fields(task_id: int):
|
||
"""ORCH-017: read (repo, branch, plane_issue_id) for a task. Never raises.
|
||
|
||
Returns (None, None, None) on any error / missing row so link building can
|
||
degrade gracefully (AC-6).
|
||
"""
|
||
try:
|
||
from .db import get_db
|
||
conn = get_db()
|
||
row = conn.execute(
|
||
"SELECT repo, branch, plane_issue_id FROM tasks WHERE id=?", (task_id,)
|
||
).fetchone()
|
||
conn.close()
|
||
if not row:
|
||
return None, None, None
|
||
return row["repo"], row["branch"], row["plane_issue_id"]
|
||
except Exception as e:
|
||
logger.warning(f"_get_task_link_fields({task_id}) failed: {e}")
|
||
return None, None, None
|
||
|
||
|
||
def _build_brd_link(repo, branch, work_item_id) -> str | None:
|
||
"""ORCH-017: '<a>' to 01-brd.md in Gitea branch-view, or None if data missing.
|
||
|
||
Mirrors the canonical branch-view pattern in src/usage.py: base =
|
||
gitea_public_url or gitea_url, owner = gitea_owner (AC-1/AC-3). The href is
|
||
html.escaped as defence-in-depth even though parts come from trusted
|
||
config/DB (AC-7).
|
||
"""
|
||
s = _get_settings()
|
||
base = (
|
||
getattr(s, "gitea_public_url", "") or getattr(s, "gitea_url", "")
|
||
).rstrip("/")
|
||
owner = getattr(s, "gitea_owner", "")
|
||
if not (base and owner and repo and branch and work_item_id):
|
||
return None
|
||
url = (
|
||
f"{base}/{owner}/{repo}/src/branch/{branch}"
|
||
f"/docs/work-items/{work_item_id}/01-brd.md"
|
||
)
|
||
return (
|
||
f'<a href="{html.escape(url, quote=True)}">'
|
||
f"\U0001f4c4 Открыть BRD</a>"
|
||
)
|
||
|
||
|
||
def _plane_issue_url(repo, plane_issue_id, project_id=None) -> str | None:
|
||
"""ORCH-067 (Р-5): build the Plane issue browser URL, or None if unbuildable.
|
||
|
||
Single source of the URL + guards, shared by ``plane_issue_link`` (link text =
|
||
issue number) and ``_build_plane_issue_link`` (link text = '✅ Задача в Plane'),
|
||
so the project resolution and loopback-guard live in ONE place (ORCH-017 Р-2).
|
||
|
||
Full path: ``{web_base}/{workspace}/projects/{project_id}/issues/{issue_id}/``.
|
||
web_base = plane_web_url or plane_api_url; a loopback base counts as "no web
|
||
URL" -> None. ``project_id`` is taken explicitly when given, else resolved from
|
||
``repo``. Never raises.
|
||
"""
|
||
try:
|
||
s = _get_settings()
|
||
web_base = (
|
||
getattr(s, "plane_web_url", "") or getattr(s, "plane_api_url", "")
|
||
).rstrip("/")
|
||
workspace = getattr(s, "plane_workspace_slug", "")
|
||
if not (web_base and workspace and plane_issue_id) or _is_loopback_base(web_base):
|
||
return None
|
||
if not project_id:
|
||
try:
|
||
from .projects import get_project_by_repo
|
||
project = get_project_by_repo(repo) if repo else None
|
||
except Exception:
|
||
project = None
|
||
project_id = getattr(project, "plane_project_id", "") if project else ""
|
||
if not project_id:
|
||
return None
|
||
return (
|
||
f"{web_base}/{workspace}/projects/{project_id}/issues/{plane_issue_id}/"
|
||
)
|
||
except Exception:
|
||
return None
|
||
|
||
|
||
def _build_plane_issue_link(repo, plane_issue_id) -> str | None:
|
||
"""ORCH-017: '<a>' to the Plane issue browser page, or None if unusable.
|
||
|
||
Link text = '✅ Задача в Plane'. URL built by the shared ``_plane_issue_url``
|
||
(loopback / workspace / project guards, ADR-001 Р-2 / ORCH-067 Р-5).
|
||
"""
|
||
url = _plane_issue_url(repo, plane_issue_id)
|
||
if not url:
|
||
return None
|
||
return (
|
||
f'<a href="{html.escape(url, quote=True)}">'
|
||
f"✅ Задача в Plane</a>"
|
||
)
|
||
|
||
|
||
def plane_issue_link(work_item_id, plane_issue_id=None, project_id=None, repo=None) -> str:
|
||
"""ORCH-067 (Р-5): clickable issue number for cards / alerts.
|
||
|
||
Returns ``<a href=...>ORCH-NNN</a>`` when a Plane web URL can be built, else
|
||
``html.escape(work_item_id)`` (number without a link). Never raises.
|
||
|
||
Link text is always ``html.escape(work_item_id)``; the href is built by the
|
||
shared ``_plane_issue_url`` (same loopback / workspace / project guards as the
|
||
'✅ Задача в Plane' link). On any missing piece -> the escaped number.
|
||
"""
|
||
label = html.escape(str(work_item_id)) if work_item_id is not None else ""
|
||
try:
|
||
url = _plane_issue_url(repo, plane_issue_id, project_id)
|
||
if not url:
|
||
return label
|
||
return f'<a href="{html.escape(url, quote=True)}">{label}</a>'
|
||
except Exception:
|
||
return label
|
||
|
||
|
||
def link_for(work_item_id, task_id=None) -> str:
|
||
"""ORCH-067 (Р-6): clickable issue number for alert points that hold only a
|
||
``work_item_id`` (or ``task_id``).
|
||
|
||
Resolves ``(repo, plane_issue_id)`` from the DB (by ``task_id`` when given,
|
||
else the latest task row for ``work_item_id``) and delegates to
|
||
``plane_issue_link``. On any missing data -> ``html.escape(work_item_id)``.
|
||
Never raises.
|
||
"""
|
||
if not work_item_id:
|
||
return html.escape(str(work_item_id)) if work_item_id is not None else ""
|
||
repo = None
|
||
plane_issue_id = None
|
||
try:
|
||
from .db import get_db
|
||
conn = get_db()
|
||
if task_id is not None:
|
||
row = conn.execute(
|
||
"SELECT repo, plane_issue_id FROM tasks WHERE id=?", (task_id,)
|
||
).fetchone()
|
||
else:
|
||
row = conn.execute(
|
||
"SELECT repo, plane_issue_id FROM tasks WHERE work_item_id=? "
|
||
"ORDER BY id DESC LIMIT 1",
|
||
(work_item_id,),
|
||
).fetchone()
|
||
conn.close()
|
||
if row:
|
||
repo = row["repo"]
|
||
plane_issue_id = row["plane_issue_id"]
|
||
except Exception as e:
|
||
logger.debug(f"link_for({work_item_id}) DB lookup failed: {e}")
|
||
return plane_issue_link(work_item_id, plane_issue_id=plane_issue_id, repo=repo)
|
||
|
||
|
||
# --------------------------------------------------------------------------- #
|
||
# ORCH-067: Plane status label for the live card (layer B indication, ADR Р-1)
|
||
# --------------------------------------------------------------------------- #
|
||
|
||
# Offline stage -> Plane status label. Names are the final ORCH-066 status names
|
||
# (_PLANE_NAME_TO_KEY). Pure / deterministic — derived entirely from tasks.stage
|
||
# (+ the brd-clock for In Review), NEVER from the network.
|
||
_STAGE_STATUS_LABEL = {
|
||
"created": "To Analyse",
|
||
"analysis": "Analysis",
|
||
"architecture": "Architecture",
|
||
"development": "Development",
|
||
"review": "Code-Review",
|
||
"testing": "Testing",
|
||
"deploy": "⏸️ Awaiting Deploy — ожидание Confirm Deploy",
|
||
"done": "Done",
|
||
}
|
||
_DEFAULT_STATUS_LABEL = "To Analyse"
|
||
_IN_REVIEW_LABEL = (
|
||
"⏸️ In Review — ожидание "
|
||
"согласования BRD"
|
||
)
|
||
|
||
# Live-overlay branch labels (keys not derivable offline from tasks.stage).
|
||
_LIVE_BRANCH_LABELS = {
|
||
"needs_input": "❓ Needs Input — нужны уточнения",
|
||
"blocked": "Blocked",
|
||
"rejected": "Rejected",
|
||
"cancelled": "Cancelled",
|
||
# ORCH-087 (G3, ADR-001 Р-4): close the deploy cycle on the card. The
|
||
# confirm_deploy logical key already exists in plane_sync (ORCH-059); drawn as
|
||
# a real, dedicated status (no base-alias) when its UUID is live in Plane so the
|
||
# card can show Awaiting Deploy → Deploying → Confirm Deploy → Monitoring → Done.
|
||
"confirm_deploy": "⏳ Confirm Deploy — подтвердите прод-деплой",
|
||
"deploying": "Deploying",
|
||
"monitoring": "Monitoring after Deploy",
|
||
}
|
||
# ORCH-066 (Р-1 anti-false-positive): deploying/monitoring alias their BASE key's
|
||
# UUID on a project without dedicated statuses (enduro). Override is applied ONLY
|
||
# when the project really defined a SEPARATE UUID for the branch key.
|
||
_LIVE_BRANCH_BASE = {
|
||
"deploying": "in_progress",
|
||
"monitoring": "done",
|
||
}
|
||
|
||
|
||
def _row_get(row, key, default=None):
|
||
"""Safe sqlite3.Row / dict / object getter. Never raises."""
|
||
try:
|
||
return row[key]
|
||
except Exception:
|
||
try:
|
||
return getattr(row, key, default)
|
||
except Exception:
|
||
return default
|
||
|
||
|
||
def plane_status_label(task_row) -> str:
|
||
"""ORCH-067 (Р-1, layer 1): current Plane status label for the card header.
|
||
|
||
Pure / deterministic from the task row, NEVER hits the network, NEVER raises.
|
||
On unknown / broken input -> a safe stage default. ``⏸️ In Review`` and
|
||
``⏸️ Awaiting Deploy`` are produced here (offline), so both work without a
|
||
network connection (AC-7, AC-8). Branch statuses that are indistinguishable
|
||
offline (Needs Input / Blocked / …) are drawn by ``_live_plane_branch_override``.
|
||
"""
|
||
try:
|
||
stage = _row_get(task_row, "stage") or "created"
|
||
except Exception:
|
||
return _DEFAULT_STATUS_LABEL
|
||
try:
|
||
if stage == "analysis":
|
||
started = _row_get(task_row, "brd_review_started_at")
|
||
ended = _row_get(task_row, "brd_review_ended_at")
|
||
if started and not ended:
|
||
return _IN_REVIEW_LABEL
|
||
return _STAGE_STATUS_LABEL.get(stage, _DEFAULT_STATUS_LABEL)
|
||
except Exception:
|
||
return _DEFAULT_STATUS_LABEL
|
||
|
||
|
||
# ORCH-067 (Р-3): per-issue TTL cache of the live state uuid -> {issue_id: (ts, uuid)}.
|
||
_LIVE_STATE_CACHE: dict[str, tuple] = {}
|
||
|
||
|
||
def _live_state_uuid_cached(plane_issue_id, project_id):
|
||
"""ORCH-067 (Р-3/Р-4): TTL-cached single live-state read for the render path.
|
||
|
||
At most one ``fetch_issue_state`` per issue per ``tracker_live_status_ttl_s``
|
||
with a SHORT timeout. Never raises -> None on any failure.
|
||
"""
|
||
try:
|
||
import time
|
||
s = _get_settings()
|
||
ttl = getattr(s, "tracker_live_status_ttl_s", 60)
|
||
now = time.monotonic()
|
||
hit = _LIVE_STATE_CACHE.get(plane_issue_id)
|
||
if hit is not None and (now - hit[0]) <= ttl:
|
||
return hit[1]
|
||
from .plane_sync import fetch_issue_state
|
||
timeout = getattr(s, "tracker_live_status_timeout_s", 3)
|
||
uuid = fetch_issue_state(plane_issue_id, project_id, timeout=timeout)
|
||
_LIVE_STATE_CACHE[plane_issue_id] = (now, uuid)
|
||
return uuid
|
||
except Exception as e:
|
||
logger.debug(f"_live_state_uuid_cached({plane_issue_id}) failed: {e}")
|
||
return None
|
||
|
||
|
||
def _live_plane_branch_override(repo, plane_issue_id, base_label) -> str:
|
||
"""ORCH-067 (Р-1 layer 2 / Р-2): best-effort live-status overlay.
|
||
|
||
Draws the branch statuses that are indistinguishable from ``tasks.stage``
|
||
offline (Needs Input / Blocked / Rejected / Cancelled / Deploying / Monitoring
|
||
after Deploy) by reading the LIVE Plane status (short timeout, TTL cache). Any
|
||
failure / disabled kill-switch / missing data -> ``base_label`` (offline). The
|
||
pipeline is NEVER blocked. Never raises.
|
||
"""
|
||
try:
|
||
s = _get_settings()
|
||
if not getattr(s, "tracker_live_status", True):
|
||
return base_label
|
||
if not plane_issue_id:
|
||
return base_label
|
||
try:
|
||
from .projects import get_project_by_repo
|
||
project = get_project_by_repo(repo) if repo else None
|
||
except Exception:
|
||
project = None
|
||
project_id = getattr(project, "plane_project_id", "") if project else ""
|
||
if not project_id:
|
||
return base_label
|
||
live_uuid = _live_state_uuid_cached(plane_issue_id, project_id)
|
||
if not live_uuid:
|
||
return base_label
|
||
from .plane_sync import get_project_states
|
||
states = get_project_states(project_id)
|
||
for key, label in _LIVE_BRANCH_LABELS.items():
|
||
uuid = states.get(key)
|
||
if not uuid or uuid != live_uuid:
|
||
continue
|
||
base_key = _LIVE_BRANCH_BASE.get(key)
|
||
if base_key and states.get(base_key) == uuid:
|
||
# deploying/monitoring just alias their base key on this project
|
||
# (enduro / no dedicated status) -> not a real branch, don't override.
|
||
continue
|
||
return label
|
||
return base_label
|
||
except Exception as e:
|
||
logger.debug(f"_live_plane_branch_override failed: {e}")
|
||
return base_label
|
||
|
||
|
||
def _card_status_label(task_row, repo=None, plane_issue_id=None) -> str:
|
||
"""ORCH-067: full status label for the card = offline core + live overlay.
|
||
|
||
Precedence (Р-1): if the offline core resolved ``⏸️ In Review`` (brd-clock,
|
||
authoritative) the overlay is NOT consulted; otherwise the overlay may draw a
|
||
branch status. Never raises (AC-9).
|
||
"""
|
||
try:
|
||
base = plane_status_label(task_row)
|
||
if base == _IN_REVIEW_LABEL:
|
||
return base
|
||
return _live_plane_branch_override(repo, plane_issue_id, base)
|
||
except Exception:
|
||
return _DEFAULT_STATUS_LABEL
|
||
|
||
|
||
def notify_approve_requested(task_id: int):
|
||
"""ALERT (separate, notifying): BRD/TZ/AC ready -> flip Plane to Approved.
|
||
|
||
Also starts the BRD-review clock and refreshes the tracker so the
|
||
'⏸️ Ревью БРД · твоё время ⏳' line appears.
|
||
"""
|
||
work_item_id = _get_work_item_id(task_id)
|
||
try:
|
||
from .db import mark_brd_review_started
|
||
mark_brd_review_started(task_id)
|
||
except Exception as e:
|
||
logger.warning(f"notify_approve_requested: brd clock start failed: {e}")
|
||
msg = (
|
||
f"\U0001f4cb {link_for(work_item_id, task_id)}: BRD/\u0422\u0417/AC \u0433\u043e\u0442\u043e\u0432\u044b. "
|
||
f"\u041f\u0435\u0440\u0435\u0432\u0435\u0434\u0438\u0442\u0435 \u0437\u0430\u0434\u0430\u0447\u0443 \u0432 \u0441\u0442\u0430\u0442\u0443\u0441 Approved "
|
||
f"\u0432 Plane \u0434\u043b\u044f \u043f\u0440\u043e\u0434\u043e\u043b\u0436\u0435\u043d\u0438\u044f."
|
||
)
|
||
# ORCH-017: embed direct links to the BRD doc (Gitea) and the Plane issue so
|
||
# the reviewer can open both straight from the ping. Each link is built
|
||
# independently and omitted if its data is missing; building is defensive so
|
||
# it can NEVER break the alert (AC-1/AC-2/AC-6). Still exactly one notifying
|
||
# message (AC-5); the call to action above is always preserved (AC-4).
|
||
try:
|
||
repo, branch, plane_issue_id = _get_task_link_fields(task_id)
|
||
links = [
|
||
link for link in (
|
||
_build_brd_link(repo, branch, work_item_id),
|
||
_build_plane_issue_link(repo, plane_issue_id),
|
||
) if link
|
||
]
|
||
if links:
|
||
msg = msg + "\n\n" + "\n".join(links)
|
||
except Exception as e:
|
||
logger.warning(f"notify_approve_requested({task_id}): link build failed: {e}")
|
||
logger.info(msg)
|
||
update_task_tracker(task_id)
|
||
send_telegram(msg) # separate, notifying
|
||
|
||
|
||
def notify_done(task_id: int):
|
||
"""Task completion: refresh the tracker to its final ГОТОВО form (no separate ping)."""
|
||
work_item_id = _get_work_item_id(task_id)
|
||
logger.info(f"\U0001f389 {work_item_id}: \u0437\u0430\u0434\u0430\u0447\u0430 \u0437\u0430\u0432\u0435\u0440\u0448\u0435\u043d\u0430!")
|
||
update_task_tracker(task_id)
|
||
|
||
|
||
def notify_error(task_id: int, error: str):
|
||
"""ALERT (separate, notifying): task error.
|
||
|
||
ORCH-067 (req 4): the issue number is a clickable Plane link (fail-safe ->
|
||
raw number) and the error text is html-escaped so it cannot break the <a>
|
||
markup under parse_mode=HTML (AC-14).
|
||
"""
|
||
work_item_id = _get_work_item_id(task_id) if task_id else "system"
|
||
num = link_for(work_item_id, task_id) if task_id else html.escape(work_item_id)
|
||
msg = f"\U0001f534 {num}: ERROR \u2014 {html.escape(str(error))}"
|
||
logger.error(msg)
|
||
send_telegram(msg) # separate, notifying
|