Files
wiki/tasks/orchestrator/DEV_TASK_ORCH1_QUEUE.md
2026-06-02 23:50:16 +03:00

15 KiB
Raw Blame History

DEV TASK: ORCH-1 (F-2b) — Персистентная очередь задач вместо in-process daemon-потоков

Статус: Ready for dev Проект: orchestrator Plane: ORCH-1 (project 8da6aa25-a60e-44d6-a1e2-d8ae59aa7d6a, prefix ORCH) Источник: tasks/orchestrator/AUDIT_2026-06-02.md (F-2b) Исполнитель: Dev-агент (model: tokenator/claude-opus-4-8) Приоритет: 🟠 №1 в бэклоге после ORCH-6


Проблема (зачем задача)

Сейчас агенты запускаются in-process (launcher.launch()):

  • subprocess.Popen + 2 daemon-thread (_watchdog + _monitor_agent) прямо в процессе uvicorn
  • 8 точек вызова из webhook-хэндлеров (plane ×4, gitea ×4), вызов СИНХРОННЫЙ → webhook-хэндлер ждёт спавна
  • Рестарт orchestrator = катастрофа: daemon-threads умирают, claude-процессы становятся сиротами (reparent to init), M-1 orphan-recovery лишь помечает их exit=-1 и зовёт человека. Работа теряется.
  • Нет лимита параллелизма — N webhook'ов = N одновременных claude (RAM/CPU перегруз)
  • Нет ретраев — упавший агент просто мёртв

Цель

Ввести персистентную очередь задач (job queue в SQLite) + воркер-петлю:

  • Webhook-хэндлеры больше НЕ спавнят процессы напрямую → кладут job в очередь (enqueue) и быстро отвечают
  • Отдельный фоновый воркер забирает jobs, уважает max_concurrency, спавнит claude (переиспользуя текущую Popen-логику), обновляет статус
  • Рестарт-safe: при старте — running-jobs возвращаются в очередь (requeue) с учётом attempts
  • Ретраи: упавший job (exit != 0) ретраится до max_attempts, потом failed + нотификация

Критерий приёмки: webhook кладёт job (мгновенный ответ), воркер исполняет последовательно/с лимитом, рестарт не теряет задачи, упавшие ретраятся.


Инфраструктура

Параметр Значение
Сервер slin@82.22.50.71 (SSH key)
Репо /home/slin/repos/orchestrator/ (remote admin/orchestrator)
Контейнер orchestrator (port 8500, network host)
Health curl -s http://localhost:8500/health
Тесты IMG=$(docker inspect orchestrator --format '{{.Config.Image}}'); docker run --rm -v /home/slin/repos/orchestrator:/code -w /code --entrypoint python3 $IMG -m pytest tests/ -q
БД /app/data/orchestrator.db (внутри контейнера)

⚠️ Хостовый .venv сломан — тесты ТОЛЬКО через образ. ⚠️ Тесты test_webhooks.py: 9 pre-existing падений (401/signature/TypeError) — НЕ твои, не трогать, но и не сломать остальные.


Текущая архитектура (собрано Стрим — не ищи вслепую)

src/agents/launcher.py (597 строк)

Строка Что
58: def launch(self, agent, repo, task_content=None, task_id=None) -> int синхронный спавн
~95: INSERT INTO agent_runs (task_id, agent)run_id запись прогона
131: proc = subprocess.Popen(["bash","-c",cmd], stdout=log_fh, ...) спавн claude
154: threading.Thread(target=self._watchdog, daemon=True).start() таймаут-киллер
163: threading.Thread(target=self._monitor_agent, daemon=True).start() ждёт, коммитит, advance stage
_watchdog(pid, run_id, timeout=1800) time.sleep(timeout)os.kill
_monitor_agent(proc, run_id, agent, repo, branch, output_path, log_fh) proc.wait(), commit+push, advance stage, может вызвать след. launch
AGENT_TIMEOUT = 1800 30 мин

Точки вызова launcher.launch(...) (8 шт)

  • plane.py:189, 234, 308, 389
  • gitea.py:126, 203, 275, 300

src/db.py — схема

tasks(id, plane_id, work_item_id, repo, branch, stage, agent_running, created_at, updated_at, plane_issue_id)
agent_runs(id, task_idtasks, agent, started_at, finished_at, exit_code, output_path)
events(id, timestamp, source, event_type, payload, processed)

src/main.py — lifespan

  • init_db() + M-1 orphan-recovery (running agent_runs старше 35 мин → exit=-1 + Telegram). Эту логику НЕ ломать, но дополнить queue-recovery.

Архитектура решения

1. Новая таблица jobs (очередь) в db.py

CREATE TABLE IF NOT EXISTS jobs (
    id INTEGER PRIMARY KEY AUTOINCREMENT,
    agent TEXT NOT NULL,
    repo TEXT NOT NULL,
    task_id INTEGER,                 -- FK tasks.id (nullable)
    task_content TEXT,               -- то, что сейчас идёт в task_file
    status TEXT NOT NULL DEFAULT 'queued',  -- queued|running|done|failed
    attempts INTEGER NOT NULL DEFAULT 0,
    max_attempts INTEGER NOT NULL DEFAULT 2,
    run_id INTEGER,                  -- agent_runs.id когда стартовал
    error TEXT,                      -- последняя ошибка
    created_at TEXT DEFAULT (datetime('now')),
    started_at TEXT,
    finished_at TEXT
);
CREATE INDEX IF NOT EXISTS idx_jobs_status ON jobs(status, id);

Хелперы в db.py: enqueue_job(agent, repo, task_content, task_id, max_attempts=2) -> job_id, claim_next_job() -> dict|None (атомарно: SELECT queued ORDER BY id LIMIT 1 → UPDATE running, used WHERE status='queued' для гонок), mark_job(job_id, status, run_id=None, error=None), count_running_jobs(), requeue_running_jobs() (для recovery), get_job(job_id).

2. src/queue_worker.py (НОВЫЙ) — воркер-петля

  • Конфиг: settings.max_concurrency (новый, дефолт 1), settings.queue_poll_interval (дефолт 2.0 сек).
  • Цикл (asyncio task, запущен в lifespan ИЛИ отдельный thread — выбери проще/надёжнее; рекомендую отдельный поток с while-loop + threading.Event для shutdown, т.к. launcher уже sync/threads):
    1. Пока count_running_jobs() < max_concurrency: job = claim_next_job(); если None → break.
    2. Для claimed job → вызвать launcher.launch_job(job) (см. п.3) НЕблокирующе.
    3. sleep(poll_interval).
  • Воркер стартует в main.py lifespan (после recovery), останавливается на shutdown (Event.set + join с таймаутом).

3. Рефактор launcher.py

  • Сохрани существующую Popen+monitor+watchdog логику (B-1/B-2/M-1/ORCH-2 фиксы НЕ ломать!).
  • Добавь launch_job(self, job: dict) -> int: то же, что launch(), но:
    • При INSERT agent_runs запиши run_id, обнови jobs.run_id, jobs.started_at.
    • В _monitor_agent по завершении: если exit_code == 0mark_job(job_id, 'done'); иначе → attempts+1; если attempts < max_attempts → вернуть в queued (mark_job(job_id,'queued')); иначе mark_job(job_id,'failed', error=...) + Telegram-нотификация. (Передай job_id в monitor через args.)
    • _watchdog при таймаут-килле → тоже трактуй как fail (ретрай/failed).
  • Старый launch() оставь как тонкую обёртку для обратной совместимости тестов: enqueue_job(...) и верни job_id? ⚠️ НЕТ — это сменит семантику (возвращал run_id). Вместо этого: оставь launch() рабочим для прямого синхронного запуска (тесты на него опираются), а воркер использует launch_job(). Если проще — пусть launch() дергает ту же внутреннюю реализацию _spawn(agent, repo, task_content, task_id, job_id=None). Реши по факту, главное — существующие тесты launcher НЕ красные.

4. Webhook-хэндлеры → enqueue

8 точек launcher.launch(agent, repo, task_desc, task_id=task_id) → заменить на enqueue_job(agent, repo, task_desc, task_id=task_id). ⚠️ _monitor_agent сейчас сам вызывает следующий launch (advance stage внутри). Это ОК — пусть внутри monitor advance-цепочка тоже идёт через enqueue_job (положит следующий job в очередь, воркер подхватит). Проверь все внутренние launch-вызовы в launcher.

5. main.py lifespan

  • После M-1 orphan-recovery (оставить) добавь queue-recovery: requeue_running_jobs() — jobs со status='running' при старте → вернуть в queued (процесс умер на рестарте), attempts не трогать или +0. Логировать.
  • Запустить воркер-петлю. На shutdown — остановить.

6. config.py

max_concurrency: int = 1          # ORCH-1: parallel agent jobs
queue_poll_interval: float = 2.0  # ORCH-1: worker poll seconds

7. /status и новый /queue эндпоинт

  • Дополнить /status или добавить GET /queue: counts по статусам (queued/running/done/failed) + последние 10 jobs. Для наблюдаемости.

Задачи (по шагам)

  • T1. Прочитать AUDIT_2026-06-02.md, launcher.py, db.py, main.py, webhooks/*.py. Ветка feature/ORCH-1-job-queue, git status чистый.
  • T2. db.py: таблица jobs + индекс + хелперы (enqueue/claim/mark/count/requeue/get). claim_next_job атомарный (защита от гонки воркера).
  • T3. config.py: max_concurrency, queue_poll_interval.
  • T4. launcher.py: launch_job() + рефактор monitor/watchdog на job-статусы + ретраи. НЕ ломать B-1/B-2/M-1/ORCH-2.
  • T5. Webhook-хэндлеры (8 точек) + внутренние advance-вызовы → enqueue_job.
  • T6. queue_worker.py + запуск/останов в main.py lifespan + queue-recovery.
  • T7. /queue (или расширить /status).
  • T8. Тесты tests/test_queue.py: enqueue→claim→mark; claim атомарность (нет двойной выдачи); ретрай (fail→queued пока attempts<max, потом failed); requeue_running_jobs; max_concurrency (воркер не превышает). Мокать реальный Popen/claude (НЕ запускать настоящего агента).
  • T9. Прогнать ВСЕ тесты в контейнере. Новые зелёные; pre-existing 9 — не трогать.
  • T10. Доки: docs/ARCHITECTURE.md (раздел job queue, диаграмма flow webhook→queue→worker→agent), docs/BUGFIXES_* или docs/ORCH-1_*.md. README — про ORCH_MAX_CONCURRENCY.
  • T11. Коммиты (Conventional Commits, отдельные по смыслу), push, PR в orchestrator.
  • T12. Деплой: docker compose up -d --build && sleep 6 && curl -s :8500/health. Проверка: enqueue тестовый job через /webhook (с валидной HMAC-подписью, секрет в env контейнера) ИЛИ напрямую enqueue_job в docker exec → убедиться job появился в jobs, воркер его взял (status running→done/failed), /queue показывает.

Acceptance (проверит Стрим)

# Проверка Ожидаемо
1 webhook кладёт job мгновенный ответ, job в jobs status=queued
2 воркер исполняет queued→running→done (exit 0)
3 лимит running не превышает max_concurrency
4 ретрай fail→queued (attempts<max), потом failed+notify
5 рестарт-safe running при старте → requeue, не теряется
6 M-1 orphan-recovery НЕ сломан
7 тесты new green, 9 pre-existing не тронуты
8 /queue counts + последние jobs
9 B-1/B-2/ORCH-2/ORCH-6 не сломаны

Ограничения

  • 🚫 НЕ трогай: nginx, openclaw.json, .env-СЕКРЕТЫ (новые ключи ORCH_MAX_CONCURRENCY, ORCH_QUEUE_POLL_INTERVAL — можно), deploy-хук, Plane-webhook is_active.
  • ⚠️ НЕ ломай фиксы: B-1 (task-file write), B-2 (Popen→log_fh, no PIPE, init:true reaper), M-1 (orphan-recovery), ORCH-2 (worktree per task), ORCH-6 (project registry/filter).
  • ⚠️ claim_next_job ДОЛЖЕН быть atomic — иначе два воркер-тика выдадут один job дважды. (При max_concurrency=1 в одном потоке гонки нет, но сделай корректно на будущее.)
  • ⚠️ Обратная совместимость: существующие тесты launcher/webhooks (кроме 9 pre-existing) остаются зелёными.
  • 🚫 НЕ переписывай claude-CLI спавн (Popen-логика рабочая) — только оборачивай в job-слой.
  • 🚫 Внешняя очередь (Redis/Celery) НЕ нужна — SQLite-таблица достаточна (single-node).

Деплой-чеклист

  • jobs таблица + хелперы
  • config: max_concurrency, poll_interval
  • launch_job + monitor/watchdog на статусы + ретраи
  • 8 webhook-точек + внутренние → enqueue_job
  • queue_worker + lifespan start/stop + queue-recovery
  • /queue эндпоинт
  • тесты зелёные (кроме 9 pre-existing)
  • орк пересобран, health ok
  • живой прогон: job queued→running→done
  • доки + BUGFIXES
  • PR создан
  • отчёт Стрим (по каждому T — короткий статус + результат проверки)

Создано: 2026-06-02 | Автор ТЗ: Стрим | Исполнитель: Dev (Opus 4.8 Tokenator)