From c6d347b09cdb8825615fbf20ff95884ec0f06882 Mon Sep 17 00:00:00 2001 From: Stream Date: Tue, 2 Jun 2026 23:50:16 +0300 Subject: [PATCH] auto-sync: 2026-06-02 23:50:01 --- tasks/orchestrator/DEV_TASK_ORCH1_QUEUE.md | 194 +++++++++++++++++++++ 1 file changed, 194 insertions(+) create mode 100644 tasks/orchestrator/DEV_TASK_ORCH1_QUEUE.md diff --git a/tasks/orchestrator/DEV_TASK_ORCH1_QUEUE.md b/tasks/orchestrator/DEV_TASK_ORCH1_QUEUE.md new file mode 100644 index 0000000..85a1aa0 --- /dev/null +++ b/tasks/orchestrator/DEV_TASK_ORCH1_QUEUE.md @@ -0,0 +1,194 @@ +# DEV TASK: ORCH-1 (F-2b) — Персистентная очередь задач вместо in-process daemon-потоков + +**Статус:** Ready for dev +**Проект:** orchestrator +**Plane:** ORCH-1 (project `8da6aa25-a60e-44d6-a1e2-d8ae59aa7d6a`, prefix ORCH) +**Источник:** `tasks/orchestrator/AUDIT_2026-06-02.md` (F-2b) +**Исполнитель:** Dev-агент (model: tokenator/claude-opus-4-8) +**Приоритет:** 🟠 №1 в бэклоге после ORCH-6 + +--- + +## Проблема (зачем задача) + +Сейчас агенты запускаются **in-process** (`launcher.launch()`): +- `subprocess.Popen` + 2 `daemon`-thread (`_watchdog` + `_monitor_agent`) прямо в процессе uvicorn +- **8 точек вызова** из webhook-хэндлеров (plane ×4, gitea ×4), вызов СИНХРОННЫЙ → webhook-хэндлер ждёт спавна +- **Рестарт orchestrator = катастрофа:** daemon-threads умирают, claude-процессы становятся сиротами (reparent to init), M-1 orphan-recovery лишь помечает их `exit=-1` и зовёт человека. Работа теряется. +- **Нет лимита параллелизма** — N webhook'ов = N одновременных claude (RAM/CPU перегруз) +- **Нет ретраев** — упавший агент просто мёртв + +## Цель + +Ввести **персистентную очередь задач** (job queue в SQLite) + **воркер-петлю**: +- Webhook-хэндлеры больше НЕ спавнят процессы напрямую → кладут job в очередь (`enqueue`) и быстро отвечают +- Отдельный фоновый воркер забирает jobs, уважает `max_concurrency`, спавнит claude (переиспользуя текущую Popen-логику), обновляет статус +- **Рестарт-safe:** при старте — running-jobs возвращаются в очередь (requeue) с учётом attempts +- **Ретраи:** упавший job (exit != 0) ретраится до `max_attempts`, потом `failed` + нотификация + +**Критерий приёмки:** webhook кладёт job (мгновенный ответ), воркер исполняет последовательно/с лимитом, рестарт не теряет задачи, упавшие ретраятся. + +--- + +## Инфраструктура + +| Параметр | Значение | +|----------|----------| +| Сервер | `slin@82.22.50.71` (SSH key) | +| Репо | `/home/slin/repos/orchestrator/` (remote `admin/orchestrator`) | +| Контейнер | `orchestrator` (port 8500, network host) | +| Health | `curl -s http://localhost:8500/health` | +| Тесты | `IMG=$(docker inspect orchestrator --format '{{.Config.Image}}'); docker run --rm -v /home/slin/repos/orchestrator:/code -w /code --entrypoint python3 $IMG -m pytest tests/ -q` | +| БД | `/app/data/orchestrator.db` (внутри контейнера) | + +⚠️ Хостовый `.venv` сломан — тесты ТОЛЬКО через образ. +⚠️ Тесты `test_webhooks.py`: 9 pre-existing падений (401/signature/TypeError) — НЕ твои, не трогать, но и не сломать остальные. + +--- + +## Текущая архитектура (собрано Стрим — не ищи вслепую) + +### `src/agents/launcher.py` (597 строк) +| Строка | Что | +|--------|-----| +| `58: def launch(self, agent, repo, task_content=None, task_id=None) -> int` | синхронный спавн | +| `~95: INSERT INTO agent_runs (task_id, agent)` → `run_id` | запись прогона | +| `131: proc = subprocess.Popen(["bash","-c",cmd], stdout=log_fh, ...)` | спавн claude | +| `154: threading.Thread(target=self._watchdog, daemon=True).start()` | таймаут-киллер | +| `163: threading.Thread(target=self._monitor_agent, daemon=True).start()` | ждёт, коммитит, advance stage | +| `_watchdog(pid, run_id, timeout=1800)` | `time.sleep(timeout)` → `os.kill` | +| `_monitor_agent(proc, run_id, agent, repo, branch, output_path, log_fh)` | `proc.wait()`, commit+push, advance stage, может вызвать след. `launch` | +| `AGENT_TIMEOUT = 1800` | 30 мин | + +### Точки вызова `launcher.launch(...)` (8 шт) +- `plane.py:189, 234, 308, 389` +- `gitea.py:126, 203, 275, 300` + +### `src/db.py` — схема +```sql +tasks(id, plane_id, work_item_id, repo, branch, stage, agent_running, created_at, updated_at, plane_issue_id) +agent_runs(id, task_id→tasks, agent, started_at, finished_at, exit_code, output_path) +events(id, timestamp, source, event_type, payload, processed) +``` + +### `src/main.py` — lifespan +- `init_db()` + **M-1 orphan-recovery** (running agent_runs старше 35 мин → `exit=-1` + Telegram). Эту логику НЕ ломать, но дополнить queue-recovery. + +--- + +## Архитектура решения + +### 1. Новая таблица `jobs` (очередь) в `db.py` +```sql +CREATE TABLE IF NOT EXISTS jobs ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + agent TEXT NOT NULL, + repo TEXT NOT NULL, + task_id INTEGER, -- FK tasks.id (nullable) + task_content TEXT, -- то, что сейчас идёт в task_file + status TEXT NOT NULL DEFAULT 'queued', -- queued|running|done|failed + attempts INTEGER NOT NULL DEFAULT 0, + max_attempts INTEGER NOT NULL DEFAULT 2, + run_id INTEGER, -- agent_runs.id когда стартовал + error TEXT, -- последняя ошибка + created_at TEXT DEFAULT (datetime('now')), + started_at TEXT, + finished_at TEXT +); +CREATE INDEX IF NOT EXISTS idx_jobs_status ON jobs(status, id); +``` +Хелперы в `db.py`: `enqueue_job(agent, repo, task_content, task_id, max_attempts=2) -> job_id`, `claim_next_job() -> dict|None` (атомарно: SELECT queued ORDER BY id LIMIT 1 → UPDATE running, used WHERE status='queued' для гонок), `mark_job(job_id, status, run_id=None, error=None)`, `count_running_jobs()`, `requeue_running_jobs()` (для recovery), `get_job(job_id)`. + +### 2. `src/queue_worker.py` (НОВЫЙ) — воркер-петля +- Конфиг: `settings.max_concurrency` (новый, дефолт 1), `settings.queue_poll_interval` (дефолт 2.0 сек). +- Цикл (asyncio task, запущен в lifespan ИЛИ отдельный thread — выбери проще/надёжнее; рекомендую **отдельный поток с while-loop + threading.Event для shutdown**, т.к. launcher уже sync/threads): + 1. Пока `count_running_jobs() < max_concurrency`: `job = claim_next_job()`; если None → break. + 2. Для claimed job → вызвать `launcher.launch_job(job)` (см. п.3) НЕблокирующе. + 3. `sleep(poll_interval)`. +- Воркер стартует в `main.py` lifespan (после recovery), останавливается на shutdown (Event.set + join с таймаутом). + +### 3. Рефактор `launcher.py` +- **Сохрани** существующую Popen+monitor+watchdog логику (B-1/B-2/M-1/ORCH-2 фиксы НЕ ломать!). +- Добавь `launch_job(self, job: dict) -> int`: то же, что `launch()`, но: + - При INSERT agent_runs запиши `run_id`, обнови `jobs.run_id`, `jobs.started_at`. + - В `_monitor_agent` по завершении: если `exit_code == 0` → `mark_job(job_id, 'done')`; иначе → `attempts+1`; если `attempts < max_attempts` → вернуть в `queued` (`mark_job(job_id,'queued')`); иначе `mark_job(job_id,'failed', error=...)` + Telegram-нотификация. (Передай `job_id` в monitor через args.) + - `_watchdog` при таймаут-килле → тоже трактуй как fail (ретрай/failed). +- **Старый `launch()`** оставь как тонкую обёртку для обратной совместимости тестов: `enqueue_job(...)` и верни job_id? ⚠️ НЕТ — это сменит семантику (возвращал run_id). Вместо этого: оставь `launch()` рабочим для прямого синхронного запуска (тесты на него опираются), а воркер использует `launch_job()`. Если проще — пусть `launch()` дергает ту же внутреннюю реализацию `_spawn(agent, repo, task_content, task_id, job_id=None)`. Реши по факту, главное — **существующие тесты launcher НЕ красные**. + +### 4. Webhook-хэндлеры → enqueue +8 точек `launcher.launch(agent, repo, task_desc, task_id=task_id)` → заменить на `enqueue_job(agent, repo, task_desc, task_id=task_id)`. +⚠️ `_monitor_agent` сейчас сам вызывает следующий `launch` (advance stage внутри). Это ОК — пусть внутри monitor advance-цепочка тоже идёт через `enqueue_job` (положит следующий job в очередь, воркер подхватит). Проверь все внутренние `launch`-вызовы в launcher. + +### 5. `main.py` lifespan +- После M-1 orphan-recovery (оставить) добавь **queue-recovery**: `requeue_running_jobs()` — jobs со `status='running'` при старте → вернуть в `queued` (процесс умер на рестарте), attempts не трогать или +0. Логировать. +- Запустить воркер-петлю. На shutdown — остановить. + +### 6. `config.py` +```python +max_concurrency: int = 1 # ORCH-1: parallel agent jobs +queue_poll_interval: float = 2.0 # ORCH-1: worker poll seconds +``` + +### 7. `/status` и новый `/queue` эндпоинт +- Дополнить `/status` или добавить `GET /queue`: counts по статусам (queued/running/done/failed) + последние 10 jobs. Для наблюдаемости. + +--- + +## Задачи (по шагам) + +- [ ] **T1.** Прочитать `AUDIT_2026-06-02.md`, `launcher.py`, `db.py`, `main.py`, `webhooks/*.py`. Ветка `feature/ORCH-1-job-queue`, `git status` чистый. +- [ ] **T2.** `db.py`: таблица `jobs` + индекс + хелперы (enqueue/claim/mark/count/requeue/get). `claim_next_job` атомарный (защита от гонки воркера). +- [ ] **T3.** `config.py`: `max_concurrency`, `queue_poll_interval`. +- [ ] **T4.** `launcher.py`: `launch_job()` + рефактор monitor/watchdog на job-статусы + ретраи. НЕ ломать B-1/B-2/M-1/ORCH-2. +- [ ] **T5.** Webhook-хэндлеры (8 точек) + внутренние advance-вызовы → `enqueue_job`. +- [ ] **T6.** `queue_worker.py` + запуск/останов в `main.py` lifespan + queue-recovery. +- [ ] **T7.** `/queue` (или расширить `/status`). +- [ ] **T8.** Тесты `tests/test_queue.py`: enqueue→claim→mark; claim атомарность (нет двойной выдачи); ретрай (fail→queued пока attempts