7.7 KiB
DEV TASK: Telegram-уведомления из Orchestrator
Статус: Ready for dev
Проект: multi-agent
Приоритет: High
Контекст
Orchestrator ведёт полный журнал всех этапов (events, tasks, agent_runs), но не отправляет уведомления в Telegram. Сейчас notifications.py только пишет в stdout (docker logs).
Слава хочет получать детерминированные уведомления в Telegram — без моделей, без галлюцинаций. Чистый скрипт: событие → HTTP POST → Telegram Bot API.
Цель
Добавить отправку уведомлений в Telegram при каждом значимом событии конвейера.
Что должно приходить в Telegram
| Событие | Формат сообщения |
|---|---|
| Stage change | 🔄 ET-007: analysis → architecture (запущен architect) |
| Agent started | 🚀 ET-007: analyst запущен (run_id=26) |
| Agent finished OK | ✅ ET-007: analyst завершил (8 мин, exit_code=0) |
| Agent failed | ❌ ET-007: developer упал (exit_code=1) |
| Agent timeout | ⏰ ET-007: developer убит по таймауту (30 мин) |
| QG passed | ✅ ET-007: QG check_ci_green — passed |
| QG failed | ⚠️ ET-007: QG check_ci_green — failed: CI state: pending |
| Analyst requests approve | 📋 ET-007: BRD/ТЗ/AC готовы. Жду :approved: в Plane |
| Task done | 🎉 ET-007: задача завершена! PR merged. |
| Error | 🔴 ET-007: ERROR — Failed to launch agent |
Реализация
1. Добавить переменные в .env
Файл: /home/slin/repos/orchestrator/.env
ORCH_TELEGRAM_BOT_TOKEN=8298776127:AAGGbOYY7arq_WLD6vo8kJ8B1Ns7lTf6NT8
ORCH_TELEGRAM_CHAT_ID=126472752
2. Добавить в config.py
Файл: src/config.py
# Telegram notifications
telegram_bot_token: str = ""
telegram_chat_id: str = ""
3. Обновить notifications.py
Файл: src/notifications.py
Добавить функцию send_telegram(text: str) и вызывать её из всех notify_*:
import httpx
from .config import settings
TELEGRAM_URL = f"https://api.telegram.org/bot{settings.telegram_bot_token}/sendMessage"
def send_telegram(text: str):
"""Send notification to Telegram. Fire-and-forget, never raises."""
if not settings.telegram_bot_token or not settings.telegram_chat_id:
return
try:
httpx.post(
TELEGRAM_URL,
json={
"chat_id": settings.telegram_chat_id,
"text": text,
"parse_mode": "HTML",
"disable_notification": False,
},
timeout=5,
)
except Exception:
pass # Never crash orchestrator due to notification failure
def notify_stage_change(task_id: int, old_stage: str, new_stage: str, agent: str = None):
"""Log and notify stage transition."""
# Get work_item_id from DB
work_item_id = _get_work_item_id(task_id)
msg = f"🔄 {work_item_id}: {old_stage} → {new_stage}"
if agent:
msg += f" (запущен {agent})"
logger.info(msg)
send_telegram(msg)
def notify_agent_started(run_id: int, agent: str, task_id: int):
"""Notify agent launch."""
work_item_id = _get_work_item_id(task_id)
msg = f"🚀 {work_item_id}: {agent} запущен (run_id={run_id})"
logger.info(msg)
send_telegram(msg)
def notify_agent_finished(run_id: int, agent: str, exit_code: int, task_id: int = None, duration_s: int = None):
"""Notify agent completion."""
work_item_id = _get_work_item_id(task_id) if task_id else "?"
if exit_code == 0:
dur = f" ({duration_s // 60} мин)" if duration_s else ""
msg = f"✅ {work_item_id}: {agent} завершил{dur}"
elif exit_code == -9:
msg = f"⏰ {work_item_id}: {agent} убит по таймауту (30 мин)"
else:
msg = f"❌ {work_item_id}: {agent} упал (exit_code={exit_code})"
logger.info(msg)
send_telegram(msg)
def notify_qg_failure(task_id: int, stage: str, check: str, reason: str):
"""Notify QG failure."""
work_item_id = _get_work_item_id(task_id)
msg = f"⚠️ {work_item_id}: QG {check} — failed: {reason}"
logger.warning(msg)
send_telegram(msg)
def notify_error(task_id: int, error: str):
"""Notify error."""
work_item_id = _get_work_item_id(task_id) if task_id else "system"
msg = f"🔴 {work_item_id}: ERROR — {error}"
logger.error(msg)
send_telegram(msg)
def notify_approve_requested(task_id: int):
"""Notify that analyst requests :approved:."""
work_item_id = _get_work_item_id(task_id)
msg = f"📋 {work_item_id}: BRD/ТЗ/AC готовы. Жду :approved: в Plane"
logger.info(msg)
send_telegram(msg)
def notify_done(task_id: int):
"""Notify task completion."""
work_item_id = _get_work_item_id(task_id)
msg = f"🎉 {work_item_id}: задача завершена!"
logger.info(msg)
send_telegram(msg)
def _get_work_item_id(task_id: int) -> str:
"""Get work_item_id from DB by task_id."""
try:
from .db import get_db
conn = get_db()
row = conn.execute("SELECT work_item_id FROM tasks WHERE id=?", (task_id,)).fetchone()
conn.close()
return row["work_item_id"] if row and row["work_item_id"] else f"task-{task_id}"
except Exception:
return f"task-{task_id}"
4. Обновить вызовы в launcher.py
Файл: src/agents/launcher.py
- В
launch()после записи в БД:notify_agent_started(run_id, agent, task_id) - В
_monitor_agent()после обновления exit_code:notify_agent_finished(run_id, agent, exit_code, task_id, duration_s) - В
_try_advance_stage()при запросе approve:notify_approve_requested(task_id)
5. Обновить вызовы в plane.py и gitea.py
Убедиться, что все notify_stage_change(), notify_qg_failure(), notify_error() передают task_id (уже передают).
Файлы для изменения
| Файл | Изменения |
|---|---|
.env |
Добавить ORCH_TELEGRAM_BOT_TOKEN, ORCH_TELEGRAM_CHAT_ID |
src/config.py |
Добавить telegram_bot_token, telegram_chat_id |
src/notifications.py |
Полная переработка: send_telegram() + обновлённые notify_* |
src/agents/launcher.py |
Вызовы notify_agent_started, notify_agent_finished, notify_approve_requested |
Ограничения
- Никаких моделей — только детерминированный код
- Fire-and-forget — если Telegram недоступен, orchestrator НЕ падает
- parse_mode: HTML — для форматирования (bold, code)
- НЕ менять порт 8500
- НЕ ломать существующий цикл
Команды проверки
# 1. Тест отправки
docker exec orchestrator python -c "
from src.notifications import send_telegram
send_telegram('🧪 Тест уведомлений из Orchestrator')
print('sent')
"
# 2. Health
curl -s http://localhost:8500/health
Результат
После деплоя Слава получает в Telegram каждое значимое событие конвейера в реальном времени. Без моделей, без галлюцинаций — чистый детерминированный скрипт.