Files
orchestrator/docs/ORCH-1_JOB_QUEUE.md

128 lines
7.7 KiB
Markdown
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# ORCH-1 (F-2b): Persistent Job Queue
**Дата:** 2026-06-02
**Ветка:** `feature/ORCH-1-job-queue`
**Источник:** AUDIT_2026-06-02 (B-2 / F-2b)
## Проблема
Агенты запускались **in-process**: `launcher.launch()` синхронно спавнил
`subprocess.Popen` + 2 daemon-thread (`_watchdog`, `_monitor_agent`) прямо в
процессе uvicorn, из **8 webhook-точек**. Последствия:
- **Рестарт = катастрофа.** daemon-threads умирают, claude-процессы → сироты,
работа теряется (M-1 лишь помечал `exit=-1` и звал человека).
- **Нет лимита параллелизма** — N webhook'ов = N одновременных claude.
- **Нет ретраев** — упавший агент просто мёртв.
## Решение
Персистентная очередь задач (SQLite-таблица `jobs`) + фоновый воркер:
1. Webhook-хэндлер кладёт job (`enqueue_job`) → мгновенный ответ 200.
2. Фоновый воркер (`src/queue_worker.py`, отдельный daemon-thread) забирает
jobs с учётом `max_concurrency` (`claim_next_job`, атомарно) и спавнит агента
(`launcher.launch_job`, та же Popen-логика).
3. По завершении `_monitor_agent``_finalize_job`:
- `exit 0``done`;
- `exit != 0` & `attempts < max_attempts` → requeue (`queued`);
- `exit != 0` & `attempts >= max_attempts``failed` + Telegram.
## Что изменено
| Файл | Изменение |
|------|-----------|
| `src/db.py` | Таблица `jobs` + индекс; хелперы `enqueue_job`, `claim_next_job` (атомарный), `mark_job`, `count_running_jobs`, `requeue_running_jobs`, `get_job`, `job_status_counts`, `recent_jobs` |
| `src/config.py` | `max_concurrency` (env `ORCH_MAX_CONCURRENCY`, default 1), `queue_poll_interval` (env `ORCH_QUEUE_POLL_INTERVAL`, default 2.0) |
| `src/agents/launcher.py` | `launch()` → тонкая обёртка над `_spawn()`; новый `launch_job(job)`; `_spawn()` (общий, `job_id` опционально); monitor/watchdog принимают `job_id`; новый `_finalize_job()` (статусы + ретраи). 4 внутренних advance-вызова `self.launch``enqueue_job` |
| `src/webhooks/plane.py` | 4 точки `launcher.launch``enqueue_job` |
| `src/webhooks/gitea.py` | 4 точки `launcher.launch``enqueue_job` |
| `src/queue_worker.py` | **НОВЫЙ**`QueueWorker` (drain loop + max_concurrency + graceful stop) |
| `src/main.py` | lifespan: queue-recovery (`requeue_running_jobs`) после M-1, старт/останов воркера; новый `GET /queue` |
| `tests/test_queue.py` | **НОВЫЙ** — 19 тестов (lifecycle, атомарность claim, ретраи, requeue, observability, worker max_concurrency; Popen полностью замокан) |
## Атомарность claim
```sql
SELECT id FROM jobs WHERE status='queued' ORDER BY id LIMIT 1;
UPDATE jobs SET status='running', attempts=attempts+1, started_at=datetime('now')
WHERE id=? AND status='queued'; -- rowcount==1 => claimed, ==0 => проиграл гонку
```
Гарантия: один job не выдаётся дважды даже при параллельных тиках воркера
(проверено `test_concurrent_claims_no_duplicate` — 8 потоков, 20 jobs).
## Сохранённые фиксы (НЕ сломаны)
- **B-1** task-file write (direct `open()` в worktree) — без изменений.
- **B-2** Popen → log_fh (no PIPE), monitor reap — без изменений, только обёрнут.
- **M-1** orphan-recovery в `main.py` — оставлен, queue-recovery добавлен ПОСЛЕ него.
- **ORCH-2** worktree per task — без изменений.
- **ORCH-6** project registry/filter — без изменений.
## Acceptance
| # | Проверка | Статус |
|---|----------|--------|
| 1 | webhook кладёт job (queued) | ✅ enqueue_job |
| 2 | воркер исполняет queued→running→done | ✅ worker + _finalize_job |
| 3 | running ≤ max_concurrency | ✅ test_worker_respects_max_concurrency |
| 4 | ретрай fail→queued→failed+notify | ✅ test_finalize_job_requeue_then_fail |
| 5 | рестарт-safe (running→requeue) | ✅ requeue_running_jobs + lifespan |
| 6 | M-1 не сломан | ✅ оставлен в lifespan |
| 7 | тесты (new green, 9 pre-existing) | ✅ 76 passed / 9 pre-existing |
| 8 | `/queue` | ✅ counts + recent |
## Тесты
```bash
IMG=$(docker inspect orchestrator --format '{{.Config.Image}}')
docker run --rm -v /home/slin/repos/orchestrator:/code -w /code \
--entrypoint python3 $IMG -m pytest tests/ -q
# 110 passed, 9 failed (pre-existing test_webhooks 401/signature/TypeError)
```
---
## Resilience-слой (ДОПОЛНЕНИЕ: preflight + 429 + backoff + circuit breaker)
Надёжность очереди против недоступности CLI и rate-limit. Два РАЗНЫХ класса
проблем лечатся по-разному.
### A. Дешёвый preflight (`src/preflight.py`) — не жжёт токены
Перед claim воркер проверяет: `os.path.exists(CLAUDE_BIN)` + `claude --version`
(timeout 5с, токены НЕ тратит). Результат кэшируется `preflight_cache_ttl` (45с).
FAIL → воркер НЕ claimит (job остаётся `queued`), ждёт. 🚫 НЕТ prompt-ping.
### B. 429 — детект НА ВЫХОДЕ (`src/error_classifier.py`)
rate-limit нельзя предсказать — классифицируем по логу прогона. `classify_log_file`
читает хвост лога (16KB), ищет `429/rate limit/overloaded/quota/503/529/timeout/...`
`transient` или `permanent`. Извлекает `Retry-After`.
- **transient** (429/сеть) → backoff-ретрай с ОТДЕЛЬНЫМ `transient_attempts`
(лимит `transient_max_attempts=5`) — не жжёт code-fault бюджет.
- **permanent** (code-fault) → обычные `attempts < max_attempts` (2), потом `failed`.
### C. Backoff + `available_at`
Колонки `jobs.available_at TEXT` + `jobs.transient_attempts INTEGER` (миграция
`_ensure_column`). `claim_next_job`: `WHERE status='queued' AND (available_at IS NULL
OR available_at <= datetime('now'))`. При transient: `available_at = now +
min(2^n * base, max)` (base=10с, max=600с), `Retry-After` уважается (берёмся max).
### D. Circuit breaker (`CircuitBreaker` в queue_worker)
N=3 transient подряд → **open**: воркер паузит `breaker_pause_seconds=300`, ВООБЩЕ
не дёргает CLI, Telegram-алерт. Через паузу → **half-open** (пробует 1 job);
ожил (exit 0) → **closed**; снова transient → опять open. Состояние в памяти
воркера, отражается в `/queue.resilience`.
Связь launcher→breaker — через callback `launcher.on_outcome` (без import-цикла).
### Конфиг (config.py)
`preflight_cache_ttl=45`, `backoff_base_seconds=10`, `backoff_max_seconds=600`,
`transient_max_attempts=5`, `breaker_threshold=3`, `breaker_pause_seconds=300`.
### Тесты
`tests/test_resilience.py` — 34 теста: preflight (FAIL→queued, кэш, force),
классификатор (transient/permanent/Retry-After), backoff (рост/cap/Retry-After,
`available_at` гейтинг), launcher transient/permanent finalize, breaker
(open/half-open/closed/re-open, блок claim).