docs(queue): document job queue, /queue, env vars (ORCH-1)
ARCHITECTURE job-queue section + flow diagram, README /queue endpoint and ORCH_MAX_CONCURRENCY/ORCH_QUEUE_POLL_INTERVAL, new docs/ORCH-1_JOB_QUEUE.md.
This commit is contained in:
@@ -264,9 +264,71 @@ services:
|
||||
|
||||
- ~~Shared `/repos` checkout (гонки при параллельных задачах).~~ **РЕШЕНО (ORCH-2 / S-4):**
|
||||
git worktree per task/branch — см. раздел «Изоляция через git worktree» ниже.
|
||||
- **In-process daemon-потоки.** Агенты запускаются в daemon-потоках uvicorn. При
|
||||
рестарте uvicorn запущенные агенты осиротевают → ловит orphan-recovery (M-1).
|
||||
Целевая архитектура — очередь задач (F-2b, отдельно).
|
||||
- ~~In-process daemon-потоки (рестарт → сироты, потеря работы).~~ **РЕШЕНО (ORCH-1 / F-2b):**
|
||||
персистентная очередь jobs + фоновый воркер — см. раздел «Очередь задач (ORCH-1)» ниже.
|
||||
Daemon-потоки monitor/watchdog остаются для одного запущенного агента, но при
|
||||
рестарте его job возвращается в `queued` (queue-recovery) и переподхватывается.
|
||||
|
||||
## Очередь задач (ORCH-1 / F-2b)
|
||||
|
||||
Раньше webhook-хэндлер **синхронно** спавнил `subprocess.Popen` + 2 daemon-thread
|
||||
прямо в процессе uvicorn (8 точек вызова). Рестарт = сироты + потеря работы,
|
||||
нет лимита параллелизма, нет ретраев.
|
||||
|
||||
### Flow
|
||||
|
||||
```
|
||||
webhook (plane/gitea) background thread (queue_worker)
|
||||
│ │
|
||||
enqueue_job() ---> [ jobs table ] <--- claim_next_job() (atomic queued->running)
|
||||
(мгновенный status=queued │
|
||||
ответ 200) launch_job(job)
|
||||
│
|
||||
AgentLauncher._spawn (Popen claude)
|
||||
│
|
||||
_monitor_agent (proc.wait, commit/push,
|
||||
│ advance stage)
|
||||
│
|
||||
_finalize_job:
|
||||
exit 0 -> mark_job done
|
||||
exit !=0 & attempts<max -> requeue (queued)
|
||||
exit !=0 & attempts>=max -> failed + Telegram
|
||||
```
|
||||
|
||||
### Таблица `jobs`
|
||||
|
||||
| Колонка | Назначение |
|
||||
|--------|------------|
|
||||
| `status` | `queued` → `running` → `done` \| `failed` |
|
||||
| `attempts` / `max_attempts` | счётчик попыток (инкремент при claim) / лимит ретраев (default 2) |
|
||||
| `run_id` | FK на `agent_runs.id` после старта |
|
||||
| `task_content` | ТЗ, которое пишется в task-файл агента |
|
||||
| `error` | последняя ошибка |
|
||||
|
||||
`idx_jobs_status (status, id)` — быстрый FIFO-выбор queued.
|
||||
|
||||
### Атомарный claim
|
||||
|
||||
`claim_next_job()` делает `SELECT queued ORDER BY id LIMIT 1` → `UPDATE ... WHERE id=? AND
|
||||
status='queued'` и проверяет `rowcount`. При гонке двух тиков лишь один UPDATE
|
||||
переведёт строку в `running` (rowcount==1); проигравший берёт следующий job.
|
||||
|
||||
### Queue-recovery (рестарт-safe)
|
||||
|
||||
В `main.py` lifespan **после** M-1 orphan-recovery вызывается `requeue_running_jobs()`:
|
||||
jobs со статусом `running` (воркер умёр на рестарте) → возвращаются в `queued`.
|
||||
Потом стартует воркер; на shutdown — `worker.stop()` (Event.set + join).
|
||||
|
||||
### Конфиг
|
||||
|
||||
- `ORCH_MAX_CONCURRENCY` (default 1) — лимит параллельных jobs.
|
||||
- `ORCH_QUEUE_POLL_INTERVAL` (default 2.0) — период опроса.
|
||||
|
||||
Наблюдаемость: `GET /queue` — counts по статусам + последние 10 jobs.
|
||||
|
||||
> Совместимость: `launcher.launch()` (прямой синхронный запуск, `job_id=None`)
|
||||
> сохранён для обратной совместимости. Очередь использует `launch_job()`;
|
||||
> оба разделяют `_spawn()` (Popen-логика B-2 не изменена).
|
||||
- **Gitea CI не настроен.** QG развития теперь локальный (`check_tests_local`);
|
||||
Gitea CI-статусы не являются authoritative и не блокируют pipeline.
|
||||
- **Docker внутри контейнера orchestrator НЕДОСТУПЕН.** Деплой идёт только через
|
||||
|
||||
83
docs/ORCH-1_JOB_QUEUE.md
Normal file
83
docs/ORCH-1_JOB_QUEUE.md
Normal file
@@ -0,0 +1,83 @@
|
||||
# ORCH-1 (F-2b): Persistent Job Queue
|
||||
|
||||
**Дата:** 2026-06-02
|
||||
**Ветка:** `feature/ORCH-1-job-queue`
|
||||
**Источник:** AUDIT_2026-06-02 (B-2 / F-2b)
|
||||
|
||||
## Проблема
|
||||
|
||||
Агенты запускались **in-process**: `launcher.launch()` синхронно спавнил
|
||||
`subprocess.Popen` + 2 daemon-thread (`_watchdog`, `_monitor_agent`) прямо в
|
||||
процессе uvicorn, из **8 webhook-точек**. Последствия:
|
||||
|
||||
- **Рестарт = катастрофа.** daemon-threads умирают, claude-процессы → сироты,
|
||||
работа теряется (M-1 лишь помечал `exit=-1` и звал человека).
|
||||
- **Нет лимита параллелизма** — N webhook'ов = N одновременных claude.
|
||||
- **Нет ретраев** — упавший агент просто мёртв.
|
||||
|
||||
## Решение
|
||||
|
||||
Персистентная очередь задач (SQLite-таблица `jobs`) + фоновый воркер:
|
||||
|
||||
1. Webhook-хэндлер кладёт job (`enqueue_job`) → мгновенный ответ 200.
|
||||
2. Фоновый воркер (`src/queue_worker.py`, отдельный daemon-thread) забирает
|
||||
jobs с учётом `max_concurrency` (`claim_next_job`, атомарно) и спавнит агента
|
||||
(`launcher.launch_job`, та же Popen-логика).
|
||||
3. По завершении `_monitor_agent` → `_finalize_job`:
|
||||
- `exit 0` → `done`;
|
||||
- `exit != 0` & `attempts < max_attempts` → requeue (`queued`);
|
||||
- `exit != 0` & `attempts >= max_attempts` → `failed` + Telegram.
|
||||
|
||||
## Что изменено
|
||||
|
||||
| Файл | Изменение |
|
||||
|------|-----------|
|
||||
| `src/db.py` | Таблица `jobs` + индекс; хелперы `enqueue_job`, `claim_next_job` (атомарный), `mark_job`, `count_running_jobs`, `requeue_running_jobs`, `get_job`, `job_status_counts`, `recent_jobs` |
|
||||
| `src/config.py` | `max_concurrency` (env `ORCH_MAX_CONCURRENCY`, default 1), `queue_poll_interval` (env `ORCH_QUEUE_POLL_INTERVAL`, default 2.0) |
|
||||
| `src/agents/launcher.py` | `launch()` → тонкая обёртка над `_spawn()`; новый `launch_job(job)`; `_spawn()` (общий, `job_id` опционально); monitor/watchdog принимают `job_id`; новый `_finalize_job()` (статусы + ретраи). 4 внутренних advance-вызова `self.launch` → `enqueue_job` |
|
||||
| `src/webhooks/plane.py` | 4 точки `launcher.launch` → `enqueue_job` |
|
||||
| `src/webhooks/gitea.py` | 4 точки `launcher.launch` → `enqueue_job` |
|
||||
| `src/queue_worker.py` | **НОВЫЙ** — `QueueWorker` (drain loop + max_concurrency + graceful stop) |
|
||||
| `src/main.py` | lifespan: queue-recovery (`requeue_running_jobs`) после M-1, старт/останов воркера; новый `GET /queue` |
|
||||
| `tests/test_queue.py` | **НОВЫЙ** — 19 тестов (lifecycle, атомарность claim, ретраи, requeue, observability, worker max_concurrency; Popen полностью замокан) |
|
||||
|
||||
## Атомарность claim
|
||||
|
||||
```sql
|
||||
SELECT id FROM jobs WHERE status='queued' ORDER BY id LIMIT 1;
|
||||
UPDATE jobs SET status='running', attempts=attempts+1, started_at=datetime('now')
|
||||
WHERE id=? AND status='queued'; -- rowcount==1 => claimed, ==0 => проиграл гонку
|
||||
```
|
||||
|
||||
Гарантия: один job не выдаётся дважды даже при параллельных тиках воркера
|
||||
(проверено `test_concurrent_claims_no_duplicate` — 8 потоков, 20 jobs).
|
||||
|
||||
## Сохранённые фиксы (НЕ сломаны)
|
||||
|
||||
- **B-1** task-file write (direct `open()` в worktree) — без изменений.
|
||||
- **B-2** Popen → log_fh (no PIPE), monitor reap — без изменений, только обёрнут.
|
||||
- **M-1** orphan-recovery в `main.py` — оставлен, queue-recovery добавлен ПОСЛЕ него.
|
||||
- **ORCH-2** worktree per task — без изменений.
|
||||
- **ORCH-6** project registry/filter — без изменений.
|
||||
|
||||
## Acceptance
|
||||
|
||||
| # | Проверка | Статус |
|
||||
|---|----------|--------|
|
||||
| 1 | webhook кладёт job (queued) | ✅ enqueue_job |
|
||||
| 2 | воркер исполняет queued→running→done | ✅ worker + _finalize_job |
|
||||
| 3 | running ≤ max_concurrency | ✅ test_worker_respects_max_concurrency |
|
||||
| 4 | ретрай fail→queued→failed+notify | ✅ test_finalize_job_requeue_then_fail |
|
||||
| 5 | рестарт-safe (running→requeue) | ✅ requeue_running_jobs + lifespan |
|
||||
| 6 | M-1 не сломан | ✅ оставлен в lifespan |
|
||||
| 7 | тесты (new green, 9 pre-existing) | ✅ 76 passed / 9 pre-existing |
|
||||
| 8 | `/queue` | ✅ counts + recent |
|
||||
|
||||
## Тесты
|
||||
|
||||
```bash
|
||||
IMG=$(docker inspect orchestrator --format '{{.Config.Image}}')
|
||||
docker run --rm -v /home/slin/repos/orchestrator:/code -w /code \
|
||||
--entrypoint python3 $IMG -m pytest tests/ -q
|
||||
# 76 passed, 9 failed (pre-existing test_webhooks 401/signature/TypeError)
|
||||
```
|
||||
Reference in New Issue
Block a user