# ORCH-1 (F-2b): Persistent Job Queue **Дата:** 2026-06-02 **Ветка:** `feature/ORCH-1-job-queue` **Источник:** AUDIT_2026-06-02 (B-2 / F-2b) ## Проблема Агенты запускались **in-process**: `launcher.launch()` синхронно спавнил `subprocess.Popen` + 2 daemon-thread (`_watchdog`, `_monitor_agent`) прямо в процессе uvicorn, из **8 webhook-точек**. Последствия: - **Рестарт = катастрофа.** daemon-threads умирают, claude-процессы → сироты, работа теряется (M-1 лишь помечал `exit=-1` и звал человека). - **Нет лимита параллелизма** — N webhook'ов = N одновременных claude. - **Нет ретраев** — упавший агент просто мёртв. ## Решение Персистентная очередь задач (SQLite-таблица `jobs`) + фоновый воркер: 1. Webhook-хэндлер кладёт job (`enqueue_job`) → мгновенный ответ 200. 2. Фоновый воркер (`src/queue_worker.py`, отдельный daemon-thread) забирает jobs с учётом `max_concurrency` (`claim_next_job`, атомарно) и спавнит агента (`launcher.launch_job`, та же Popen-логика). 3. По завершении `_monitor_agent` → `_finalize_job`: - `exit 0` → `done`; - `exit != 0` & `attempts < max_attempts` → requeue (`queued`); - `exit != 0` & `attempts >= max_attempts` → `failed` + Telegram. ## Что изменено | Файл | Изменение | |------|-----------| | `src/db.py` | Таблица `jobs` + индекс; хелперы `enqueue_job`, `claim_next_job` (атомарный), `mark_job`, `count_running_jobs`, `requeue_running_jobs`, `get_job`, `job_status_counts`, `recent_jobs` | | `src/config.py` | `max_concurrency` (env `ORCH_MAX_CONCURRENCY`, default 1), `queue_poll_interval` (env `ORCH_QUEUE_POLL_INTERVAL`, default 2.0) | | `src/agents/launcher.py` | `launch()` → тонкая обёртка над `_spawn()`; новый `launch_job(job)`; `_spawn()` (общий, `job_id` опционально); monitor/watchdog принимают `job_id`; новый `_finalize_job()` (статусы + ретраи). 4 внутренних advance-вызова `self.launch` → `enqueue_job` | | `src/webhooks/plane.py` | 4 точки `launcher.launch` → `enqueue_job` | | `src/webhooks/gitea.py` | 4 точки `launcher.launch` → `enqueue_job` | | `src/queue_worker.py` | **НОВЫЙ** — `QueueWorker` (drain loop + max_concurrency + graceful stop) | | `src/main.py` | lifespan: queue-recovery (`requeue_running_jobs`) после M-1, старт/останов воркера; новый `GET /queue` | | `tests/test_queue.py` | **НОВЫЙ** — 19 тестов (lifecycle, атомарность claim, ретраи, requeue, observability, worker max_concurrency; Popen полностью замокан) | ## Атомарность claim ```sql SELECT id FROM jobs WHERE status='queued' ORDER BY id LIMIT 1; UPDATE jobs SET status='running', attempts=attempts+1, started_at=datetime('now') WHERE id=? AND status='queued'; -- rowcount==1 => claimed, ==0 => проиграл гонку ``` Гарантия: один job не выдаётся дважды даже при параллельных тиках воркера (проверено `test_concurrent_claims_no_duplicate` — 8 потоков, 20 jobs). ## Сохранённые фиксы (НЕ сломаны) - **B-1** task-file write (direct `open()` в worktree) — без изменений. - **B-2** Popen → log_fh (no PIPE), monitor reap — без изменений, только обёрнут. - **M-1** orphan-recovery в `main.py` — оставлен, queue-recovery добавлен ПОСЛЕ него. - **ORCH-2** worktree per task — без изменений. - **ORCH-6** project registry/filter — без изменений. ## Acceptance | # | Проверка | Статус | |---|----------|--------| | 1 | webhook кладёт job (queued) | ✅ enqueue_job | | 2 | воркер исполняет queued→running→done | ✅ worker + _finalize_job | | 3 | running ≤ max_concurrency | ✅ test_worker_respects_max_concurrency | | 4 | ретрай fail→queued→failed+notify | ✅ test_finalize_job_requeue_then_fail | | 5 | рестарт-safe (running→requeue) | ✅ requeue_running_jobs + lifespan | | 6 | M-1 не сломан | ✅ оставлен в lifespan | | 7 | тесты (new green, 9 pre-existing) | ✅ 76 passed / 9 pre-existing | | 8 | `/queue` | ✅ counts + recent | ## Тесты ```bash IMG=$(docker inspect orchestrator --format '{{.Config.Image}}') docker run --rm -v /home/slin/repos/orchestrator:/code -w /code \ --entrypoint python3 $IMG -m pytest tests/ -q # 110 passed, 9 failed (pre-existing test_webhooks 401/signature/TypeError) ``` --- ## Resilience-слой (ДОПОЛНЕНИЕ: preflight + 429 + backoff + circuit breaker) Надёжность очереди против недоступности CLI и rate-limit. Два РАЗНЫХ класса проблем лечатся по-разному. ### A. Дешёвый preflight (`src/preflight.py`) — не жжёт токены Перед claim воркер проверяет: `os.path.exists(CLAUDE_BIN)` + `claude --version` (timeout 5с, токены НЕ тратит). Результат кэшируется `preflight_cache_ttl` (45с). FAIL → воркер НЕ claim’ит (job остаётся `queued`), ждёт. 🚫 НЕТ prompt-ping. ### B. 429 — детект НА ВЫХОДЕ (`src/error_classifier.py`) rate-limit нельзя предсказать — классифицируем по логу прогона. `classify_log_file` читает хвост лога (16KB), ищет `429/rate limit/overloaded/quota/503/529/timeout/...` → `transient` или `permanent`. Извлекает `Retry-After`. - **transient** (429/сеть) → backoff-ретрай с ОТДЕЛЬНЫМ `transient_attempts` (лимит `transient_max_attempts=5`) — не жжёт code-fault бюджет. - **permanent** (code-fault) → обычные `attempts < max_attempts` (2), потом `failed`. ### C. Backoff + `available_at` Колонки `jobs.available_at TEXT` + `jobs.transient_attempts INTEGER` (миграция `_ensure_column`). `claim_next_job`: `WHERE status='queued' AND (available_at IS NULL OR available_at <= datetime('now'))`. При transient: `available_at = now + min(2^n * base, max)` (base=10с, max=600с), `Retry-After` уважается (берёмся max). ### D. Circuit breaker (`CircuitBreaker` в queue_worker) N=3 transient подряд → **open**: воркер паузит `breaker_pause_seconds=300`, ВООБЩЕ не дёргает CLI, Telegram-алерт. Через паузу → **half-open** (пробует 1 job); ожил (exit 0) → **closed**; снова transient → опять open. Состояние в памяти воркера, отражается в `/queue.resilience`. Связь launcher→breaker — через callback `launcher.on_outcome` (без import-цикла). ### Конфиг (config.py) `preflight_cache_ttl=45`, `backoff_base_seconds=10`, `backoff_max_seconds=600`, `transient_max_attempts=5`, `breaker_threshold=3`, `breaker_pause_seconds=300`. ### Тесты `tests/test_resilience.py` — 34 теста: preflight (FAIL→queued, кэш, force), классификатор (transient/permanent/Retry-After), backoff (рост/cap/Retry-After, `available_at` гейтинг), launcher transient/permanent finalize, breaker (open/half-open/closed/re-open, блок claim).