Files
wiki/tasks/orchestrator/DEV_TASK_TELEGRAM_NOTIFICATIONS.md
2026-06-02 21:00:01 +03:00

7.7 KiB
Raw Permalink Blame History

DEV TASK: Telegram-уведомления из Orchestrator

Статус: Ready for dev
Проект: multi-agent
Приоритет: High


Контекст

Orchestrator ведёт полный журнал всех этапов (events, tasks, agent_runs), но не отправляет уведомления в Telegram. Сейчас notifications.py только пишет в stdout (docker logs).

Слава хочет получать детерминированные уведомления в Telegram — без моделей, без галлюцинаций. Чистый скрипт: событие → HTTP POST → Telegram Bot API.


Цель

Добавить отправку уведомлений в Telegram при каждом значимом событии конвейера.


Что должно приходить в Telegram

Событие Формат сообщения
Stage change 🔄 ET-007: analysis → architecture (запущен architect)
Agent started 🚀 ET-007: analyst запущен (run_id=26)
Agent finished OK ✅ ET-007: analyst завершил (8 мин, exit_code=0)
Agent failed ❌ ET-007: developer упал (exit_code=1)
Agent timeout ⏰ ET-007: developer убит по таймауту (30 мин)
QG passed ✅ ET-007: QG check_ci_green — passed
QG failed ⚠️ ET-007: QG check_ci_green — failed: CI state: pending
Analyst requests approve 📋 ET-007: BRD/ТЗ/AC готовы. Жду :approved: в Plane
Task done 🎉 ET-007: задача завершена! PR merged.
Error 🔴 ET-007: ERROR — Failed to launch agent

Реализация

1. Добавить переменные в .env

Файл: /home/slin/repos/orchestrator/.env

ORCH_TELEGRAM_BOT_TOKEN=8298776127:AAGGbOYY7arq_WLD6vo8kJ8B1Ns7lTf6NT8
ORCH_TELEGRAM_CHAT_ID=126472752

2. Добавить в config.py

Файл: src/config.py

# Telegram notifications
telegram_bot_token: str = ""
telegram_chat_id: str = ""

3. Обновить notifications.py

Файл: src/notifications.py

Добавить функцию send_telegram(text: str) и вызывать её из всех notify_*:

import httpx
from .config import settings

TELEGRAM_URL = f"https://api.telegram.org/bot{settings.telegram_bot_token}/sendMessage"

def send_telegram(text: str):
    """Send notification to Telegram. Fire-and-forget, never raises."""
    if not settings.telegram_bot_token or not settings.telegram_chat_id:
        return
    try:
        httpx.post(
            TELEGRAM_URL,
            json={
                "chat_id": settings.telegram_chat_id,
                "text": text,
                "parse_mode": "HTML",
                "disable_notification": False,
            },
            timeout=5,
        )
    except Exception:
        pass  # Never crash orchestrator due to notification failure


def notify_stage_change(task_id: int, old_stage: str, new_stage: str, agent: str = None):
    """Log and notify stage transition."""
    # Get work_item_id from DB
    work_item_id = _get_work_item_id(task_id)
    msg = f"🔄 {work_item_id}: {old_stage}{new_stage}"
    if agent:
        msg += f" (запущен {agent})"
    logger.info(msg)
    send_telegram(msg)


def notify_agent_started(run_id: int, agent: str, task_id: int):
    """Notify agent launch."""
    work_item_id = _get_work_item_id(task_id)
    msg = f"🚀 {work_item_id}: {agent} запущен (run_id={run_id})"
    logger.info(msg)
    send_telegram(msg)


def notify_agent_finished(run_id: int, agent: str, exit_code: int, task_id: int = None, duration_s: int = None):
    """Notify agent completion."""
    work_item_id = _get_work_item_id(task_id) if task_id else "?"
    if exit_code == 0:
        dur = f" ({duration_s // 60} мин)" if duration_s else ""
        msg = f"✅ {work_item_id}: {agent} завершил{dur}"
    elif exit_code == -9:
        msg = f"⏰ {work_item_id}: {agent} убит по таймауту (30 мин)"
    else:
        msg = f"❌ {work_item_id}: {agent} упал (exit_code={exit_code})"
    logger.info(msg)
    send_telegram(msg)


def notify_qg_failure(task_id: int, stage: str, check: str, reason: str):
    """Notify QG failure."""
    work_item_id = _get_work_item_id(task_id)
    msg = f"⚠️ {work_item_id}: QG {check} — failed: {reason}"
    logger.warning(msg)
    send_telegram(msg)


def notify_error(task_id: int, error: str):
    """Notify error."""
    work_item_id = _get_work_item_id(task_id) if task_id else "system"
    msg = f"🔴 {work_item_id}: ERROR — {error}"
    logger.error(msg)
    send_telegram(msg)


def notify_approve_requested(task_id: int):
    """Notify that analyst requests :approved:."""
    work_item_id = _get_work_item_id(task_id)
    msg = f"📋 {work_item_id}: BRD/ТЗ/AC готовы. Жду :approved: в Plane"
    logger.info(msg)
    send_telegram(msg)


def notify_done(task_id: int):
    """Notify task completion."""
    work_item_id = _get_work_item_id(task_id)
    msg = f"🎉 {work_item_id}: задача завершена!"
    logger.info(msg)
    send_telegram(msg)


def _get_work_item_id(task_id: int) -> str:
    """Get work_item_id from DB by task_id."""
    try:
        from .db import get_db
        conn = get_db()
        row = conn.execute("SELECT work_item_id FROM tasks WHERE id=?", (task_id,)).fetchone()
        conn.close()
        return row["work_item_id"] if row and row["work_item_id"] else f"task-{task_id}"
    except Exception:
        return f"task-{task_id}"

4. Обновить вызовы в launcher.py

Файл: src/agents/launcher.py

  • В launch() после записи в БД: notify_agent_started(run_id, agent, task_id)
  • В _monitor_agent() после обновления exit_code: notify_agent_finished(run_id, agent, exit_code, task_id, duration_s)
  • В _try_advance_stage() при запросе approve: notify_approve_requested(task_id)

5. Обновить вызовы в plane.py и gitea.py

Убедиться, что все notify_stage_change(), notify_qg_failure(), notify_error() передают task_id (уже передают).


Файлы для изменения

Файл Изменения
.env Добавить ORCH_TELEGRAM_BOT_TOKEN, ORCH_TELEGRAM_CHAT_ID
src/config.py Добавить telegram_bot_token, telegram_chat_id
src/notifications.py Полная переработка: send_telegram() + обновлённые notify_*
src/agents/launcher.py Вызовы notify_agent_started, notify_agent_finished, notify_approve_requested

Ограничения

  • Никаких моделей — только детерминированный код
  • Fire-and-forget — если Telegram недоступен, orchestrator НЕ падает
  • parse_mode: HTML — для форматирования (bold, code)
  • НЕ менять порт 8500
  • НЕ ломать существующий цикл

Команды проверки

# 1. Тест отправки
docker exec orchestrator python -c "
from src.notifications import send_telegram
send_telegram('🧪 Тест уведомлений из Orchestrator')
print('sent')
"

# 2. Health
curl -s http://localhost:8500/health

Результат

После деплоя Слава получает в Telegram каждое значимое событие конвейера в реальном времени. Без моделей, без галлюцинаций — чистый детерминированный скрипт.