20 KiB
DEV TASK: ORCH-1 (F-2b) — Персистентная очередь задач вместо in-process daemon-потоков
Статус: Ready for dev
Проект: orchestrator
Plane: ORCH-1 (project 8da6aa25-a60e-44d6-a1e2-d8ae59aa7d6a, prefix ORCH)
Источник: tasks/orchestrator/AUDIT_2026-06-02.md (F-2b)
Исполнитель: Dev-агент (model: tokenator/claude-opus-4-8)
Приоритет: 🟠 №1 в бэклоге после ORCH-6
Проблема (зачем задача)
Сейчас агенты запускаются in-process (launcher.launch()):
subprocess.Popen+ 2daemon-thread (_watchdog+_monitor_agent) прямо в процессе uvicorn- 8 точек вызова из webhook-хэндлеров (plane ×4, gitea ×4), вызов СИНХРОННЫЙ → webhook-хэндлер ждёт спавна
- Рестарт orchestrator = катастрофа: daemon-threads умирают, claude-процессы становятся сиротами (reparent to init), M-1 orphan-recovery лишь помечает их
exit=-1и зовёт человека. Работа теряется. - Нет лимита параллелизма — N webhook'ов = N одновременных claude (RAM/CPU перегруз)
- Нет ретраев — упавший агент просто мёртв
Цель
Ввести персистентную очередь задач (job queue в SQLite) + воркер-петлю:
- Webhook-хэндлеры больше НЕ спавнят процессы напрямую → кладут job в очередь (
enqueue) и быстро отвечают - Отдельный фоновый воркер забирает jobs, уважает
max_concurrency, спавнит claude (переиспользуя текущую Popen-логику), обновляет статус - Рестарт-safe: при старте — running-jobs возвращаются в очередь (requeue) с учётом attempts
- Ретраи: упавший job (exit != 0) ретраится до
max_attempts, потомfailed+ нотификация
Критерий приёмки: webhook кладёт job (мгновенный ответ), воркер исполняет последовательно/с лимитом, рестарт не теряет задачи, упавшие ретраятся.
Инфраструктура
| Параметр | Значение |
|---|---|
| Сервер | slin@82.22.50.71 (SSH key) |
| Репо | /home/slin/repos/orchestrator/ (remote admin/orchestrator) |
| Контейнер | orchestrator (port 8500, network host) |
| Health | curl -s http://localhost:8500/health |
| Тесты | IMG=$(docker inspect orchestrator --format '{{.Config.Image}}'); docker run --rm -v /home/slin/repos/orchestrator:/code -w /code --entrypoint python3 $IMG -m pytest tests/ -q |
| БД | /app/data/orchestrator.db (внутри контейнера) |
⚠️ Хостовый .venv сломан — тесты ТОЛЬКО через образ.
⚠️ Тесты test_webhooks.py: 9 pre-existing падений (401/signature/TypeError) — НЕ твои, не трогать, но и не сломать остальные.
Текущая архитектура (собрано Стрим — не ищи вслепую)
src/agents/launcher.py (597 строк)
| Строка | Что |
|---|---|
58: def launch(self, agent, repo, task_content=None, task_id=None) -> int |
синхронный спавн |
~95: INSERT INTO agent_runs (task_id, agent) → run_id |
запись прогона |
131: proc = subprocess.Popen(["bash","-c",cmd], stdout=log_fh, ...) |
спавн claude |
154: threading.Thread(target=self._watchdog, daemon=True).start() |
таймаут-киллер |
163: threading.Thread(target=self._monitor_agent, daemon=True).start() |
ждёт, коммитит, advance stage |
_watchdog(pid, run_id, timeout=1800) |
time.sleep(timeout) → os.kill |
_monitor_agent(proc, run_id, agent, repo, branch, output_path, log_fh) |
proc.wait(), commit+push, advance stage, может вызвать след. launch |
AGENT_TIMEOUT = 1800 |
30 мин |
Точки вызова launcher.launch(...) (8 шт)
plane.py:189, 234, 308, 389gitea.py:126, 203, 275, 300
src/db.py — схема
tasks(id, plane_id, work_item_id, repo, branch, stage, agent_running, created_at, updated_at, plane_issue_id)
agent_runs(id, task_id→tasks, agent, started_at, finished_at, exit_code, output_path)
events(id, timestamp, source, event_type, payload, processed)
src/main.py — lifespan
init_db()+ M-1 orphan-recovery (running agent_runs старше 35 мин →exit=-1+ Telegram). Эту логику НЕ ломать, но дополнить queue-recovery.
Архитектура решения
1. Новая таблица jobs (очередь) в db.py
CREATE TABLE IF NOT EXISTS jobs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
agent TEXT NOT NULL,
repo TEXT NOT NULL,
task_id INTEGER, -- FK tasks.id (nullable)
task_content TEXT, -- то, что сейчас идёт в task_file
status TEXT NOT NULL DEFAULT 'queued', -- queued|running|done|failed
attempts INTEGER NOT NULL DEFAULT 0,
max_attempts INTEGER NOT NULL DEFAULT 2,
run_id INTEGER, -- agent_runs.id когда стартовал
error TEXT, -- последняя ошибка
created_at TEXT DEFAULT (datetime('now')),
started_at TEXT,
finished_at TEXT
);
CREATE INDEX IF NOT EXISTS idx_jobs_status ON jobs(status, id);
Хелперы в db.py: enqueue_job(agent, repo, task_content, task_id, max_attempts=2) -> job_id, claim_next_job() -> dict|None (атомарно: SELECT queued ORDER BY id LIMIT 1 → UPDATE running, used WHERE status='queued' для гонок), mark_job(job_id, status, run_id=None, error=None), count_running_jobs(), requeue_running_jobs() (для recovery), get_job(job_id).
2. src/queue_worker.py (НОВЫЙ) — воркер-петля
- Конфиг:
settings.max_concurrency(новый, дефолт 1),settings.queue_poll_interval(дефолт 2.0 сек). - Цикл (asyncio task, запущен в lifespan ИЛИ отдельный thread — выбери проще/надёжнее; рекомендую отдельный поток с while-loop + threading.Event для shutdown, т.к. launcher уже sync/threads):
- Пока
count_running_jobs() < max_concurrency:job = claim_next_job(); если None → break. - Для claimed job → вызвать
launcher.launch_job(job)(см. п.3) НЕблокирующе. sleep(poll_interval).
- Пока
- Воркер стартует в
main.pylifespan (после recovery), останавливается на shutdown (Event.set + join с таймаутом).
3. Рефактор launcher.py
- Сохрани существующую Popen+monitor+watchdog логику (B-1/B-2/M-1/ORCH-2 фиксы НЕ ломать!).
- Добавь
launch_job(self, job: dict) -> int: то же, чтоlaunch(), но:- При INSERT agent_runs запиши
run_id, обновиjobs.run_id,jobs.started_at. - В
_monitor_agentпо завершении: еслиexit_code == 0→mark_job(job_id, 'done'); иначе →attempts+1; еслиattempts < max_attempts→ вернуть вqueued(mark_job(job_id,'queued')); иначеmark_job(job_id,'failed', error=...)+ Telegram-нотификация. (Передайjob_idв monitor через args.) _watchdogпри таймаут-килле → тоже трактуй как fail (ретрай/failed).
- При INSERT agent_runs запиши
- Старый
launch()оставь как тонкую обёртку для обратной совместимости тестов:enqueue_job(...)и верни job_id? ⚠️ НЕТ — это сменит семантику (возвращал run_id). Вместо этого: оставьlaunch()рабочим для прямого синхронного запуска (тесты на него опираются), а воркер используетlaunch_job(). Если проще — пустьlaunch()дергает ту же внутреннюю реализацию_spawn(agent, repo, task_content, task_id, job_id=None). Реши по факту, главное — существующие тесты launcher НЕ красные.
4. Webhook-хэндлеры → enqueue
8 точек launcher.launch(agent, repo, task_desc, task_id=task_id) → заменить на enqueue_job(agent, repo, task_desc, task_id=task_id).
⚠️ _monitor_agent сейчас сам вызывает следующий launch (advance stage внутри). Это ОК — пусть внутри monitor advance-цепочка тоже идёт через enqueue_job (положит следующий job в очередь, воркер подхватит). Проверь все внутренние launch-вызовы в launcher.
5. main.py lifespan
- После M-1 orphan-recovery (оставить) добавь queue-recovery:
requeue_running_jobs()— jobs соstatus='running'при старте → вернуть вqueued(процесс умер на рестарте), attempts не трогать или +0. Логировать. - Запустить воркер-петлю. На shutdown — остановить.
6. config.py
max_concurrency: int = 1 # ORCH-1: parallel agent jobs
queue_poll_interval: float = 2.0 # ORCH-1: worker poll seconds
7. /status и новый /queue эндпоинт
- Дополнить
/statusили добавитьGET /queue: counts по статусам (queued/running/done/failed) + последние 10 jobs. Для наблюдаемости.
Задачи (по шагам)
- T1. Прочитать
AUDIT_2026-06-02.md,launcher.py,db.py,main.py,webhooks/*.py. Веткаfeature/ORCH-1-job-queue,git statusчистый. - T2.
db.py: таблицаjobs+ индекс + хелперы (enqueue/claim/mark/count/requeue/get).claim_next_jobатомарный (защита от гонки воркера). - T3.
config.py:max_concurrency,queue_poll_interval. - T4.
launcher.py:launch_job()+ рефактор monitor/watchdog на job-статусы + ретраи. НЕ ломать B-1/B-2/M-1/ORCH-2. - T5. Webhook-хэндлеры (8 точек) + внутренние advance-вызовы →
enqueue_job. - T6.
queue_worker.py+ запуск/останов вmain.pylifespan + queue-recovery. - T7.
/queue(или расширить/status). - T8. Тесты
tests/test_queue.py: enqueue→claim→mark; claim атомарность (нет двойной выдачи); ретрай (fail→queued пока attempts<max, потом failed); requeue_running_jobs; max_concurrency (воркер не превышает). Мокать реальный Popen/claude (НЕ запускать настоящего агента). - T9. Прогнать ВСЕ тесты в контейнере. Новые зелёные; pre-existing 9 — не трогать.
- T10. Доки:
docs/ARCHITECTURE.md(раздел job queue, диаграмма flow webhook→queue→worker→agent),docs/BUGFIXES_*илиdocs/ORCH-1_*.md. README — проORCH_MAX_CONCURRENCY. - T11. Коммиты (Conventional Commits, отдельные по смыслу), push, PR в orchestrator.
- T12. Деплой:
docker compose up -d --build && sleep 6 && curl -s :8500/health. Проверка: enqueue тестовый job через/webhook(с валидной HMAC-подписью, секрет в env контейнера) ИЛИ напрямуюenqueue_jobвdocker exec→ убедиться job появился вjobs, воркер его взял (status running→done/failed),/queueпоказывает.
ДОПОЛНЕНИЕ (от Славы, 2026-06-02): preflight + 429/rate-limit + backoff + circuit breaker
⚠️ Важное расширение надёжности. Сейчас агент падает при недоступности CLI/429 как обычный фейл — это неправильно. Два РАЗНЫХ класса проблем, лечатся по-разному.
A. Дешёвый preflight (CLI/сеть доступны?) — НЕ жжёт токены
- Перед тем как воркер claim'ит job — дешёвая проверка (
src/preflight.pyили функция в launcher):os.path.exists(CLAUDE_BIN)— мгновенноclaude --version(subprocess, timeout 5с) — токены НЕ тратит- (опционально) TCP-connect до endpoint Tokenator — «порт жив?», бесплатно
- Кэшировать результат ~30-60с (не дёргать на каждый тик). Конфиг:
preflight_cache_ttl: int = 45. - Если preflight FAIL → воркер НЕ claim'ит job (остаётся
queued), логирует, ждёт следующий тик. Никто не падает впустую. - 🚫 НЕ делать prompt-ping (ping→pong) перед каждым job — это трата лимита и латентность. Только local-проверки.
B. 429/rate-limit — детектить НА ВЫХОДЕ, не предсказывать
- Rate limit НЕЛЬЗЯ надёжно предсказать заранее — ловить по результату прогона.
- В
_monitor_agentпосле завершения: распарсить log/stderr на паттерны:429,rate limit,overloaded,rate_limit_error,Retry-After,quota. Классификатор вsrc/error_classifier.py(или функция): вернётtransient(429/overload/сеть) илиpermanent(code-fault). - Разные ветки ретраев:
transient(429/недоступность) → backoff-ретрай, attempts НЕ инкрементить как code-fault (или отдельный счётчикtransient_attemptsс большим лимитом, напр. 5)permanent(code-fault) → обычные attempts < max_attempts (2), потомfailed
C. Backoff + available_at в очереди
- Добавить в таблицу
jobsколонкуavailable_at TEXT(когда job снова можно брать) +transient_attempts INTEGER DEFAULT 0. claim_next_job→WHERE status='queued' AND (available_at IS NULL OR available_at <= datetime('now')).- При transient-фейле:
available_at = now + backoff. Exponential backoff: напр.min(2^transient_attempts * base, max_backoff), base=10с, max=600с. УважатьRetry-Afterесли сервер прислал (распарсить из лога, если есть).
D. Circuit breaker (рубильник)
- Если подряд N (напр. 3) job падают с transient/недоступностью → открыть breaker: воркер паузит на M минут (напр. 5), ВООБЩЕ не дёргает CLI (не тратит попытки/лимит), шлёт Telegram-алерт.
- Через M мин → half-open: пробует ОДИН job. Ожил (exit 0) → закрыть breaker. Опять transient → снова пауза.
- Состояние breaker в памяти воркера (или лёгкая таблица/файл) + отражать в
/queue. - Конфиг:
breaker_threshold: int = 3,breaker_pause_seconds: int = 300.
Конфиг (добавить в config.py)
preflight_cache_ttl: int = 45 # кэш дешёвого preflight, сек
backoff_base_seconds: int = 10 # transient backoff base
backoff_max_seconds: int = 600 # потолок backoff
transient_max_attempts: int = 5 # ретраи для 429/недоступности
breaker_threshold: int = 3 # сколько transient подряд до открытия
breaker_pause_seconds: int = 300 # пауза при открытом breaker
Доп-тесты (в test_queue.py или test_resilience.py)
- preflight FAIL → job остаётся queued, не спавнится
- preflight кэш (не дёргает
claude --versionчаще ttl) - классификатор: лог с "429"/"rate limit"/"overloaded" → transient; обычная ошибка → permanent
- transient fail →
available_atв будущем, claim его не берёт пока не наступит время - backoff растёт экспоненциально; Retry-After уважается
- breaker: 3 transient подряд → open (воркер паузит, CLI не дёргается); half-open → ожил → closed
Acceptance (проверит Стрим)
| # | Проверка | Ожидаемо |
|---|---|---|
| 1 | webhook кладёт job | мгновенный ответ, job в jobs status=queued |
| 2 | воркер исполняет | queued→running→done (exit 0) |
| 3 | лимит | running не превышает max_concurrency |
| 4 | ретрай | fail→queued (attempts<max), потом failed+notify |
| 5 | рестарт-safe | running при старте → requeue, не теряется |
| 6 | M-1 orphan-recovery | НЕ сломан |
| 7 | тесты | new green, 9 pre-existing не тронуты |
| 8 | /queue |
counts + последние jobs |
| 9 | B-1/B-2/ORCH-2/ORCH-6 | не сломаны |
| 10 | preflight | local-only, кэш, не жжёт токены; FAIL → job queued, не падает |
| 11 | 429-классификатор | transient vs permanent разделены |
| 12 | backoff | transient → available_at в будущем, exp backoff, Retry-After |
| 13 | circuit breaker | N transient подряд → пауза+алерт, half-open → recover |
Ограничения
- 🚫 НЕ трогай: nginx, openclaw.json, .env-СЕКРЕТЫ (новые ключи
ORCH_MAX_CONCURRENCY,ORCH_QUEUE_POLL_INTERVAL— можно), deploy-хук, Plane-webhookis_active. - ⚠️ НЕ ломай фиксы: B-1 (task-file write), B-2 (Popen→log_fh, no PIPE, init:true reaper), M-1 (orphan-recovery), ORCH-2 (worktree per task), ORCH-6 (project registry/filter).
- ⚠️
claim_next_jobДОЛЖЕН быть atomic — иначе два воркер-тика выдадут один job дважды. (При max_concurrency=1 в одном потоке гонки нет, но сделай корректно на будущее.) - ⚠️ Обратная совместимость: существующие тесты launcher/webhooks (кроме 9 pre-existing) остаются зелёными.
- 🚫 НЕ переписывай claude-CLI спавн (Popen-логика рабочая) — только оборачивай в job-слой.
- 🚫 Внешняя очередь (Redis/Celery) НЕ нужна — SQLite-таблица достаточна (single-node).
Деплой-чеклист
jobsтаблица + хелперы- config: max_concurrency, poll_interval
- launch_job + monitor/watchdog на статусы + ретраи
- 8 webhook-точек + внутренние → enqueue_job
- queue_worker + lifespan start/stop + queue-recovery
- /queue эндпоинт
- тесты зелёные (кроме 9 pre-existing)
- орк пересобран, health ok
- живой прогон: job queued→running→done
- доки + BUGFIXES
- PR создан
- отчёт Стрим (по каждому T — короткий статус + результат проверки)
Создано: 2026-06-02 | Автор ТЗ: Стрим | Исполнитель: Dev (Opus 4.8 Tokenator)