7.7 KiB
ORCH-1 (F-2b): Persistent Job Queue
Дата: 2026-06-02
Ветка: feature/ORCH-1-job-queue
Источник: AUDIT_2026-06-02 (B-2 / F-2b)
Проблема
Агенты запускались in-process: launcher.launch() синхронно спавнил
subprocess.Popen + 2 daemon-thread (_watchdog, _monitor_agent) прямо в
процессе uvicorn, из 8 webhook-точек. Последствия:
- Рестарт = катастрофа. daemon-threads умирают, claude-процессы → сироты,
работа теряется (M-1 лишь помечал
exit=-1и звал человека). - Нет лимита параллелизма — N webhook'ов = N одновременных claude.
- Нет ретраев — упавший агент просто мёртв.
Решение
Персистентная очередь задач (SQLite-таблица jobs) + фоновый воркер:
- Webhook-хэндлер кладёт job (
enqueue_job) → мгновенный ответ 200. - Фоновый воркер (
src/queue_worker.py, отдельный daemon-thread) забирает jobs с учётомmax_concurrency(claim_next_job, атомарно) и спавнит агента (launcher.launch_job, та же Popen-логика). - По завершении
_monitor_agent→_finalize_job:exit 0→done;exit != 0&attempts < max_attempts→ requeue (queued);exit != 0&attempts >= max_attempts→failed+ Telegram.
Что изменено
| Файл | Изменение |
|---|---|
src/db.py |
Таблица jobs + индекс; хелперы enqueue_job, claim_next_job (атомарный), mark_job, count_running_jobs, requeue_running_jobs, get_job, job_status_counts, recent_jobs |
src/config.py |
max_concurrency (env ORCH_MAX_CONCURRENCY, default 1), queue_poll_interval (env ORCH_QUEUE_POLL_INTERVAL, default 2.0) |
src/agents/launcher.py |
launch() → тонкая обёртка над _spawn(); новый launch_job(job); _spawn() (общий, job_id опционально); monitor/watchdog принимают job_id; новый _finalize_job() (статусы + ретраи). 4 внутренних advance-вызова self.launch → enqueue_job |
src/webhooks/plane.py |
4 точки launcher.launch → enqueue_job |
src/webhooks/gitea.py |
4 точки launcher.launch → enqueue_job |
src/queue_worker.py |
НОВЫЙ — QueueWorker (drain loop + max_concurrency + graceful stop) |
src/main.py |
lifespan: queue-recovery (requeue_running_jobs) после M-1, старт/останов воркера; новый GET /queue |
tests/test_queue.py |
НОВЫЙ — 19 тестов (lifecycle, атомарность claim, ретраи, requeue, observability, worker max_concurrency; Popen полностью замокан) |
Атомарность claim
SELECT id FROM jobs WHERE status='queued' ORDER BY id LIMIT 1;
UPDATE jobs SET status='running', attempts=attempts+1, started_at=datetime('now')
WHERE id=? AND status='queued'; -- rowcount==1 => claimed, ==0 => проиграл гонку
Гарантия: один job не выдаётся дважды даже при параллельных тиках воркера
(проверено test_concurrent_claims_no_duplicate — 8 потоков, 20 jobs).
Сохранённые фиксы (НЕ сломаны)
- B-1 task-file write (direct
open()в worktree) — без изменений. - B-2 Popen → log_fh (no PIPE), monitor reap — без изменений, только обёрнут.
- M-1 orphan-recovery в
main.py— оставлен, queue-recovery добавлен ПОСЛЕ него. - ORCH-2 worktree per task — без изменений.
- ORCH-6 project registry/filter — без изменений.
Acceptance
| # | Проверка | Статус |
|---|---|---|
| 1 | webhook кладёт job (queued) | ✅ enqueue_job |
| 2 | воркер исполняет queued→running→done | ✅ worker + _finalize_job |
| 3 | running ≤ max_concurrency | ✅ test_worker_respects_max_concurrency |
| 4 | ретрай fail→queued→failed+notify | ✅ test_finalize_job_requeue_then_fail |
| 5 | рестарт-safe (running→requeue) | ✅ requeue_running_jobs + lifespan |
| 6 | M-1 не сломан | ✅ оставлен в lifespan |
| 7 | тесты (new green, 9 pre-existing) | ✅ 76 passed / 9 pre-existing |
| 8 | /queue |
✅ counts + recent |
Тесты
IMG=$(docker inspect orchestrator --format '{{.Config.Image}}')
docker run --rm -v /home/slin/repos/orchestrator:/code -w /code \
--entrypoint python3 $IMG -m pytest tests/ -q
# 110 passed, 9 failed (pre-existing test_webhooks 401/signature/TypeError)
Resilience-слой (ДОПОЛНЕНИЕ: preflight + 429 + backoff + circuit breaker)
Надёжность очереди против недоступности CLI и rate-limit. Два РАЗНЫХ класса проблем лечатся по-разному.
A. Дешёвый preflight (src/preflight.py) — не жжёт токены
Перед claim воркер проверяет: os.path.exists(CLAUDE_BIN) + claude --version
(timeout 5с, токены НЕ тратит). Результат кэшируется preflight_cache_ttl (45с).
FAIL → воркер НЕ claim’ит (job остаётся queued), ждёт. 🚫 НЕТ prompt-ping.
B. 429 — детект НА ВЫХОДЕ (src/error_classifier.py)
rate-limit нельзя предсказать — классифицируем по логу прогона. classify_log_file
читает хвост лога (16KB), ищет 429/rate limit/overloaded/quota/503/529/timeout/...
→ transient или permanent. Извлекает Retry-After.
- transient (429/сеть) → backoff-ретрай с ОТДЕЛЬНЫМ
transient_attempts(лимитtransient_max_attempts=5) — не жжёт code-fault бюджет. - permanent (code-fault) → обычные
attempts < max_attempts(2), потомfailed.
C. Backoff + available_at
Колонки jobs.available_at TEXT + jobs.transient_attempts INTEGER (миграция
_ensure_column). claim_next_job: WHERE status='queued' AND (available_at IS NULL OR available_at <= datetime('now')). При transient: available_at = now + min(2^n * base, max) (base=10с, max=600с), Retry-After уважается (берёмся max).
D. Circuit breaker (CircuitBreaker в queue_worker)
N=3 transient подряд → open: воркер паузит breaker_pause_seconds=300, ВООБЩЕ
не дёргает CLI, Telegram-алерт. Через паузу → half-open (пробует 1 job);
ожил (exit 0) → closed; снова transient → опять open. Состояние в памяти
воркера, отражается в /queue.resilience.
Связь launcher→breaker — через callback launcher.on_outcome (без import-цикла).
Конфиг (config.py)
preflight_cache_ttl=45, backoff_base_seconds=10, backoff_max_seconds=600,
transient_max_attempts=5, breaker_threshold=3, breaker_pause_seconds=300.
Тесты
tests/test_resilience.py — 34 теста: preflight (FAIL→queued, кэш, force),
классификатор (transient/permanent/Retry-After), backoff (рост/cap/Retry-After,
available_at гейтинг), launcher transient/permanent finalize, breaker
(open/half-open/closed/re-open, блок claim).