Compare commits
21 Commits
feature/OR
...
feature/OR
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
c23f000c05 | ||
|
|
d0d47058b4 | ||
|
|
a613fd8180 | ||
|
|
f314ae09e5 | ||
|
|
90fdd19394 | ||
|
|
4ef87a3959 | ||
|
|
0cd9b11fe0 | ||
|
|
4be168c0ec | ||
|
|
2283b8898b | ||
|
|
b6d4426a48 | ||
|
|
20d6556e22 | ||
|
|
3345c2fa0a | ||
|
|
fd3dac7d22 | ||
| b021ff7cb0 | |||
|
|
ca81f38330 | ||
|
|
c1f35a2047 | ||
|
|
a6f6a43c1c | ||
|
|
171f4eb304 | ||
|
|
a87c633003 | ||
|
|
0797f958dc | ||
|
|
36d5f25f2a |
75
README.md
75
README.md
@@ -39,6 +39,7 @@ created → analysis → architecture → development → review → testing →
|
|||||||
|--------|------|----------|
|
|--------|------|----------|
|
||||||
| GET | `/health` | Health check |
|
| GET | `/health` | Health check |
|
||||||
| GET | `/status` | Активные задачи (stage != done) |
|
| GET | `/status` | Активные задачи (stage != done) |
|
||||||
|
| GET | `/queue` | Очередь задач (ORCH-1): counts по статусам + max_concurrency + последние 10 jobs |
|
||||||
| POST | `/webhook/plane` | Plane webhook receiver |
|
| POST | `/webhook/plane` | Plane webhook receiver |
|
||||||
| POST | `/webhook/gitea` | Gitea webhook receiver |
|
| POST | `/webhook/gitea` | Gitea webhook receiver |
|
||||||
|
|
||||||
@@ -52,8 +53,9 @@ src/
|
|||||||
├── stages.py # State machine (transitions, agents, QG)
|
├── stages.py # State machine (transitions, agents, QG)
|
||||||
├── notifications.py # Уведомления (логирование)
|
├── notifications.py # Уведомления (логирование)
|
||||||
├── plane_sync.py # Синхронизация статусов с Plane API
|
├── plane_sync.py # Синхронизация статусов с Plane API
|
||||||
|
├── queue_worker.py # ORCH-1: фоновый воркер очереди (claim → launch_job)
|
||||||
├── agents/
|
├── agents/
|
||||||
│ └── launcher.py # AgentLauncher: launch, monitor, watchdog, auto-advance
|
│ └── launcher.py # AgentLauncher: launch/launch_job, monitor, watchdog, auto-advance
|
||||||
├── webhooks/
|
├── webhooks/
|
||||||
│ ├── plane.py # Plane webhook handler
|
│ ├── plane.py # Plane webhook handler
|
||||||
│ └── gitea.py # Gitea webhook handler (push, PR, CI status)
|
│ └── gitea.py # Gitea webhook handler (push, PR, CI status)
|
||||||
@@ -101,11 +103,80 @@ uvicorn src.main:app --reload --port 8500
|
|||||||
| `ORCH_GITEA_TOKEN` | Gitea API token | — |
|
| `ORCH_GITEA_TOKEN` | Gitea API token | — |
|
||||||
| `ORCH_GITEA_WEBHOOK_SECRET` | Gitea webhook secret | — |
|
| `ORCH_GITEA_WEBHOOK_SECRET` | Gitea webhook secret | — |
|
||||||
| `ORCH_GITEA_OWNER` | Gitea repo owner | `admin` |
|
| `ORCH_GITEA_OWNER` | Gitea repo owner | `admin` |
|
||||||
| `ORCH_DEFAULT_REPO` | Default repository | `enduro-trails` |
|
| `ORCH_DEFAULT_REPO` | Default repository (fallback) | `enduro-trails` |
|
||||||
|
| `ORCH_PROJECTS_JSON` | Multi-repo реестр (JSON-массив, ORCH-6) | `""` → дефолт в `src/projects.py` |
|
||||||
| `ORCH_CLAUDE_BIN` | Путь к Claude CLI | `/opt/claude-code/bin/claude.exe` |
|
| `ORCH_CLAUDE_BIN` | Путь к Claude CLI | `/opt/claude-code/bin/claude.exe` |
|
||||||
| `ORCH_REPOS_DIR` | Repos dir (container) | `/repos` |
|
| `ORCH_REPOS_DIR` | Repos dir (container) | `/repos` |
|
||||||
| `ORCH_HOST_REPOS_DIR` | Repos dir (host) | `/home/slin/repos` |
|
| `ORCH_HOST_REPOS_DIR` | Repos dir (host) | `/home/slin/repos` |
|
||||||
| `ORCH_DB_PATH` | SQLite path | `/app/data/orchestrator.db` |
|
| `ORCH_DB_PATH` | SQLite path | `/app/data/orchestrator.db` |
|
||||||
|
| `ORCH_MAX_CONCURRENCY` | Сколько jobs воркер запускает параллельно (ORCH-1) | `1` |
|
||||||
|
| `ORCH_QUEUE_POLL_INTERVAL` | Период опроса очереди воркером, сек (ORCH-1) | `2.0` |
|
||||||
|
| `ORCH_PREFLIGHT_CACHE_TTL` | Кэш preflight (CLI/net), сек (ORCH-1 resilience) | `45` |
|
||||||
|
| `ORCH_BACKOFF_BASE_SECONDS` | База exp-backoff для transient (429) | `10` |
|
||||||
|
| `ORCH_BACKOFF_MAX_SECONDS` | Потолок backoff | `600` |
|
||||||
|
| `ORCH_TRANSIENT_MAX_ATTEMPTS` | Ретраи для 429/недоступности | `5` |
|
||||||
|
| `ORCH_BREAKER_THRESHOLD` | transient подряд до открытия breaker | `3` |
|
||||||
|
| `ORCH_BREAKER_PAUSE_SECONDS` | Пауза при открытом breaker | `300` |
|
||||||
|
|
||||||
|
## Очередь задач (ORCH-1 / F-2b)
|
||||||
|
|
||||||
|
Webhook-хэндлеры больше не спавнят claude-агентов синхронно в процессе uvicorn.
|
||||||
|
Вместо этого они кладут **job** в персистентную SQLite-таблицу `jobs`
|
||||||
|
(`enqueue_job`, мгновенный ответ), а фоновый воркер (`src/queue_worker.py`)
|
||||||
|
забирает jobs с учётом `ORCH_MAX_CONCURRENCY` и запускает агента (`launch_job`,
|
||||||
|
та же Popen-логика, что и раньше).
|
||||||
|
|
||||||
|
Преимущества:
|
||||||
|
- **Рестарт-safe.** При старте jobs со статусом `running` возвращаются в `queued`
|
||||||
|
(queue-recovery в lifespan) — работа не теряется.
|
||||||
|
- **Лимит параллелизма.** Воркер не превышает `ORCH_MAX_CONCURRENCY`.
|
||||||
|
- **Ретраи.** Упавший job (exit≠0) ретраится пока `attempts < max_attempts`,
|
||||||
|
потом `failed` + Telegram-нотификация.
|
||||||
|
|
||||||
|
Статусы job: `queued → running → done | failed`. Наблюдаемость — через `GET /queue`.
|
||||||
|
|
||||||
|
**Resilience-слой:** дешёвый preflight (CLI/net, кэш, без токенов) гейтит claim;
|
||||||
|
429/overload детектится по логу (transient vs permanent), transient ретраится с
|
||||||
|
exp-backoff (`available_at`, Retry-After); circuit breaker паузит воркер после N
|
||||||
|
transient подряд. Подробности: `docs/ORCH-1_JOB_QUEUE.md`.
|
||||||
|
|
||||||
|
## Multi-repo: реестр проектов (ORCH-6)
|
||||||
|
|
||||||
|
Оркестратор обслуживает несколько репозиториев через реестр проектов
|
||||||
|
(`src/projects.py`), ключ = **Plane project id**. Plane-webhook фильтрует события
|
||||||
|
по проекту (неизвестный проект → `ignored`) и резолвит `repo` / `work_item_prefix` /
|
||||||
|
Plane-проект из маппинга.
|
||||||
|
|
||||||
|
По умолчанию (если `ORCH_PROJECTS_JSON` пуст) зарегистрированы два проекта:
|
||||||
|
|
||||||
|
| Проект | Plane project id | repo | prefix |
|
||||||
|
|--------|------------------|------|--------|
|
||||||
|
| enduro-trails | `7a79f0a9-5278-49cd-9007-9a338f238f9c` | `enduro-trails` | `ET` |
|
||||||
|
| orchestrator | `8da6aa25-a60e-44d6-a1e2-d8ae59aa7d6a` | `orchestrator` | `ORCH` |
|
||||||
|
|
||||||
|
### Как добавить новый проект
|
||||||
|
|
||||||
|
1. Убедись, что gitea-репо уже клонировано в `/repos/<repo>` (авто-clone — отдельно).
|
||||||
|
2. Узнай Plane project uuid (из URL проекта в Plane или через Plane API).
|
||||||
|
3. Добавь запись в `ORCH_PROJECTS_JSON` в `.env` (JSON-массив). **Важно:** если
|
||||||
|
задаёшь `ORCH_PROJECTS_JSON`, он полностью заменяет дефолт — перечисли **все**
|
||||||
|
нужные проекты (включая enduro-trails и orchestrator):
|
||||||
|
|
||||||
|
```bash
|
||||||
|
ORCH_PROJECTS_JSON='[
|
||||||
|
{"plane_project_id":"7a79f0a9-5278-49cd-9007-9a338f238f9c","repo":"enduro-trails","work_item_prefix":"ET","name":"enduro-trails"},
|
||||||
|
{"plane_project_id":"8da6aa25-a60e-44d6-a1e2-d8ae59aa7d6a","repo":"orchestrator","work_item_prefix":"ORCH","name":"orchestrator"},
|
||||||
|
{"plane_project_id":"<новый-uuid>","repo":"<новый-repo>","work_item_prefix":"<PREFIX>","name":"<имя>"}
|
||||||
|
]'
|
||||||
|
```
|
||||||
|
|
||||||
|
4. Пересобери: `docker compose up -d --build`.
|
||||||
|
5. Проверь резолв:
|
||||||
|
```bash
|
||||||
|
docker exec orchestrator python3 -c "from src.projects import get_project_by_plane_id as g; print(g('<новый-uuid>'))"
|
||||||
|
```
|
||||||
|
|
||||||
|
Поля `name` опционально (по умолчанию = `repo`). Подробности — `docs/ARCHITECTURE.md`.
|
||||||
|
|
||||||
## Ключевые механизмы
|
## Ключевые механизмы
|
||||||
|
|
||||||
|
|||||||
@@ -9,9 +9,39 @@ Orchestrator — event-driven FastAPI сервис, который управл
|
|||||||
### 1. Webhook Receivers
|
### 1. Webhook Receivers
|
||||||
|
|
||||||
#### Plane Webhook (`src/webhooks/plane.py`)
|
#### Plane Webhook (`src/webhooks/plane.py`)
|
||||||
- Принимает `work_item.created` — создаёт задачу в DB, запускает analyst
|
- **Фильтр по проекту (ORCH-6):** извлекает `data.project` (Plane project uuid) и игнорирует событие, если проект не в реестре (`known_plane_project_ids()`) → ответ `{"status":"ignored","reason":"unknown project"}`. Это предотвращает инцидент 2026-06-02 (webhook на весь workspace без фильтра).
|
||||||
|
- Принимает `work_item.created` — резолвит repo/prefix/Plane-проект из реестра по `project`, создаёт задачу в DB, запускает analyst
|
||||||
- Принимает `work_item.updated` — синхронизация статусов
|
- Принимает `work_item.updated` — синхронизация статусов
|
||||||
|
|
||||||
|
#### Реестр проектов (`src/projects.py`, multi-repo, ORCH-6)
|
||||||
|
Маппинг **Plane project id → (repo, work_item_prefix, name)**. Позволяет одному
|
||||||
|
оркестратору обслуживать несколько репозиториев, не путая их.
|
||||||
|
|
||||||
|
```python
|
||||||
|
@dataclass(frozen=True)
|
||||||
|
class ProjectConfig:
|
||||||
|
plane_project_id: str # uuid Plane-проекта (ключ реестра)
|
||||||
|
repo: str # имя gitea-репо (= папка в /repos)
|
||||||
|
work_item_prefix: str # ET / ORCH
|
||||||
|
name: str # человекочитаемое
|
||||||
|
```
|
||||||
|
|
||||||
|
Резолверы:
|
||||||
|
- `get_project_by_plane_id(uuid) -> ProjectConfig | None` — для фильтра/резолва в plane-webhook.
|
||||||
|
- `get_project_by_repo(repo) -> ProjectConfig | None` — когда известен только repo (gitea-webhook, plane_sync).
|
||||||
|
- `known_plane_project_ids() -> set[str]` — множество разрешённых проектов (фильтр).
|
||||||
|
|
||||||
|
**Источник конфигурации:** env `ORCH_PROJECTS_JSON` (JSON-массив `ProjectConfig`).
|
||||||
|
Если пусто/битый JSON — используется встроенный дефолт-реестр (enduro-trails + orchestrator),
|
||||||
|
чтобы система работала из коробки. Парсинг устойчив: битые записи пропускаются,
|
||||||
|
полностью невалидный JSON → fallback на дефолт.
|
||||||
|
|
||||||
|
Следствия multi-repo:
|
||||||
|
- **repo per project:** `repo = get_project_by_plane_id(project_id).repo` вместо хардкода `default_repo`.
|
||||||
|
- **prefix per project:** `get_next_work_item_id(repo, prefix)` нумерует независимо — `ORCH-001` vs `ET-010` (`src/db.py`).
|
||||||
|
- **plane_sync в правильный проект:** state/comment пишутся в Plane-проект самой задачи (резолв по repo через `get_project_by_repo`), а не в единственный хардкоженный `PROJECT_ID` (обратная совместимость сохранена дефолтом на enduro).
|
||||||
|
- **gitea-webhook:** push в repo вне реестра → `ignored` (не триггерит конвейер).
|
||||||
|
|
||||||
#### Gitea Webhook (`src/webhooks/gitea.py`)
|
#### Gitea Webhook (`src/webhooks/gitea.py`)
|
||||||
- **push** — проверяет наличие артефактов (docs/, src/), продвигает стадию
|
- **push** — проверяет наличие артефактов (docs/, src/), продвигает стадию
|
||||||
- **pull_request\*** (wildcard) — обрабатывает review approved/rejected, PR merge
|
- **pull_request\*** (wildcard) — обрабатывает review approved/rejected, PR merge
|
||||||
@@ -234,9 +264,71 @@ services:
|
|||||||
|
|
||||||
- ~~Shared `/repos` checkout (гонки при параллельных задачах).~~ **РЕШЕНО (ORCH-2 / S-4):**
|
- ~~Shared `/repos` checkout (гонки при параллельных задачах).~~ **РЕШЕНО (ORCH-2 / S-4):**
|
||||||
git worktree per task/branch — см. раздел «Изоляция через git worktree» ниже.
|
git worktree per task/branch — см. раздел «Изоляция через git worktree» ниже.
|
||||||
- **In-process daemon-потоки.** Агенты запускаются в daemon-потоках uvicorn. При
|
- ~~In-process daemon-потоки (рестарт → сироты, потеря работы).~~ **РЕШЕНО (ORCH-1 / F-2b):**
|
||||||
рестарте uvicorn запущенные агенты осиротевают → ловит orphan-recovery (M-1).
|
персистентная очередь jobs + фоновый воркер — см. раздел «Очередь задач (ORCH-1)» ниже.
|
||||||
Целевая архитектура — очередь задач (F-2b, отдельно).
|
Daemon-потоки monitor/watchdog остаются для одного запущенного агента, но при
|
||||||
|
рестарте его job возвращается в `queued` (queue-recovery) и переподхватывается.
|
||||||
|
|
||||||
|
## Очередь задач (ORCH-1 / F-2b)
|
||||||
|
|
||||||
|
Раньше webhook-хэндлер **синхронно** спавнил `subprocess.Popen` + 2 daemon-thread
|
||||||
|
прямо в процессе uvicorn (8 точек вызова). Рестарт = сироты + потеря работы,
|
||||||
|
нет лимита параллелизма, нет ретраев.
|
||||||
|
|
||||||
|
### Flow
|
||||||
|
|
||||||
|
```
|
||||||
|
webhook (plane/gitea) background thread (queue_worker)
|
||||||
|
│ │
|
||||||
|
enqueue_job() ---> [ jobs table ] <--- claim_next_job() (atomic queued->running)
|
||||||
|
(мгновенный status=queued │
|
||||||
|
ответ 200) launch_job(job)
|
||||||
|
│
|
||||||
|
AgentLauncher._spawn (Popen claude)
|
||||||
|
│
|
||||||
|
_monitor_agent (proc.wait, commit/push,
|
||||||
|
│ advance stage)
|
||||||
|
│
|
||||||
|
_finalize_job:
|
||||||
|
exit 0 -> mark_job done
|
||||||
|
exit !=0 & attempts<max -> requeue (queued)
|
||||||
|
exit !=0 & attempts>=max -> failed + Telegram
|
||||||
|
```
|
||||||
|
|
||||||
|
### Таблица `jobs`
|
||||||
|
|
||||||
|
| Колонка | Назначение |
|
||||||
|
|--------|------------|
|
||||||
|
| `status` | `queued` → `running` → `done` \| `failed` |
|
||||||
|
| `attempts` / `max_attempts` | счётчик попыток (инкремент при claim) / лимит ретраев (default 2) |
|
||||||
|
| `run_id` | FK на `agent_runs.id` после старта |
|
||||||
|
| `task_content` | ТЗ, которое пишется в task-файл агента |
|
||||||
|
| `error` | последняя ошибка |
|
||||||
|
|
||||||
|
`idx_jobs_status (status, id)` — быстрый FIFO-выбор queued.
|
||||||
|
|
||||||
|
### Атомарный claim
|
||||||
|
|
||||||
|
`claim_next_job()` делает `SELECT queued ORDER BY id LIMIT 1` → `UPDATE ... WHERE id=? AND
|
||||||
|
status='queued'` и проверяет `rowcount`. При гонке двух тиков лишь один UPDATE
|
||||||
|
переведёт строку в `running` (rowcount==1); проигравший берёт следующий job.
|
||||||
|
|
||||||
|
### Queue-recovery (рестарт-safe)
|
||||||
|
|
||||||
|
В `main.py` lifespan **после** M-1 orphan-recovery вызывается `requeue_running_jobs()`:
|
||||||
|
jobs со статусом `running` (воркер умёр на рестарте) → возвращаются в `queued`.
|
||||||
|
Потом стартует воркер; на shutdown — `worker.stop()` (Event.set + join).
|
||||||
|
|
||||||
|
### Конфиг
|
||||||
|
|
||||||
|
- `ORCH_MAX_CONCURRENCY` (default 1) — лимит параллельных jobs.
|
||||||
|
- `ORCH_QUEUE_POLL_INTERVAL` (default 2.0) — период опроса.
|
||||||
|
|
||||||
|
Наблюдаемость: `GET /queue` — counts по статусам + последние 10 jobs.
|
||||||
|
|
||||||
|
> Совместимость: `launcher.launch()` (прямой синхронный запуск, `job_id=None`)
|
||||||
|
> сохранён для обратной совместимости. Очередь использует `launch_job()`;
|
||||||
|
> оба разделяют `_spawn()` (Popen-логика B-2 не изменена).
|
||||||
- **Gitea CI не настроен.** QG развития теперь локальный (`check_tests_local`);
|
- **Gitea CI не настроен.** QG развития теперь локальный (`check_tests_local`);
|
||||||
Gitea CI-статусы не являются authoritative и не блокируют pipeline.
|
Gitea CI-статусы не являются authoritative и не блокируют pipeline.
|
||||||
- **Docker внутри контейнера orchestrator НЕДОСТУПЕН.** Деплой идёт только через
|
- **Docker внутри контейнера orchestrator НЕДОСТУПЕН.** Деплой идёт только через
|
||||||
|
|||||||
82
docs/BUGFIXES_2026-06-03.md
Normal file
82
docs/BUGFIXES_2026-06-03.md
Normal file
@@ -0,0 +1,82 @@
|
|||||||
|
# BUGFIXES / CHANGES — 2026-06-03
|
||||||
|
|
||||||
|
## ORCH-6 — Multi-repo: фильтр проекта + маппинг repo per project
|
||||||
|
|
||||||
|
**Тип:** root-fix инцидента + новая возможность (multi-repo)
|
||||||
|
**Ветка:** `feature/ORCH-6-multirepo`
|
||||||
|
**Plane:** ORCH-6 (project `8da6aa25-a60e-44d6-a1e2-d8ae59aa7d6a`)
|
||||||
|
**Связанный инцидент:** [`INCIDENT_2026-06-02_webhook_autorun.txt`](./INCIDENT_2026-06-02_webhook_autorun.txt)
|
||||||
|
|
||||||
|
### Контекст инцидента
|
||||||
|
|
||||||
|
При создании задач ORCH-1..7 в Plane (проект `orchestrator`) Plane-webhook
|
||||||
|
(id `93f0c342-a614-4248-9d0f-c107276f5620`) сработал на каждую задачу и запустил
|
||||||
|
конвейер — но **всё ушло в репо `enduro-trails`**, потому что `plane.py:91`
|
||||||
|
хардкодил `repo = settings.default_repo`. Webhook слушал **весь workspace без
|
||||||
|
фильтра по проекту**, наплодив мусорные ET-010..016.
|
||||||
|
|
||||||
|
Митигация на время фикса: Plane-webhook **деактивирован** (`is_active=false`).
|
||||||
|
|
||||||
|
### Root cause
|
||||||
|
|
||||||
|
1. Нет фильтра по Plane-проекту — любая issue из любого проекта попадала в конвейер.
|
||||||
|
2. `repo` хардкожен на единственный `default_repo` (enduro-trails).
|
||||||
|
3. `work_item_prefix` всегда `ET` (db.py).
|
||||||
|
4. `plane_sync` ходил в единственный хардкоженный `PROJECT_ID` (enduro).
|
||||||
|
|
||||||
|
### Что сделано
|
||||||
|
|
||||||
|
| Файл | Изменение |
|
||||||
|
|------|-----------|
|
||||||
|
| `src/projects.py` (новый) | Реестр проектов: `ProjectConfig` + дефолт-список (enduro-trails + orchestrator) + резолверы `get_project_by_plane_id` / `get_project_by_repo` / `known_plane_project_ids`. Источник переопределения — `ORCH_PROJECTS_JSON`; устойчивый парсинг (битый JSON / битые записи → fallback на дефолт). |
|
||||||
|
| `src/config.py` | Добавлен `projects_json: str = ""` (env `ORCH_PROJECTS_JSON`). |
|
||||||
|
| `src/webhooks/plane.py` | **Фильтр по проекту**: `data.project` не в реестре → `{"status":"ignored","reason":"unknown project"}`. Резолв `repo`/`prefix`/Plane-проекта из реестра. Plane-sync для задачи идёт в её собственный проект. |
|
||||||
|
| `src/db.py` | `get_next_work_item_id(repo, prefix="ET")` — нумерация per (repo, prefix); `ORCH-001` независимо от `ET-010`. Дефолт `ET` сохранён для обратной совместимости. |
|
||||||
|
| `src/plane_sync.py` | `_resolve_project_id` + параметризация `project_id` (дефолт на enduro → обратная совместимость существующих вызовов). |
|
||||||
|
| `src/webhooks/gitea.py` | Неизвестный repo (`get_project_by_repo` → None) → `ignored` в 3 хэндлерах. |
|
||||||
|
|
||||||
|
### Тесты
|
||||||
|
|
||||||
|
- `tests/test_projects.py` (16 тестов): резолверы (by plane_id, by repo, unknown→None,
|
||||||
|
known_plane_project_ids), парсинг `ORCH_PROJECTS_JSON` (валидный / битый JSON / не массив /
|
||||||
|
битые записи → skip / all-bad → fallback), reload с кастомным JSON.
|
||||||
|
- `tests/test_plane_webhook.py` (4 теста, FastAPI TestClient, `launcher.launch` замокан):
|
||||||
|
unknown project → `ignored` + нет task/branch/agent; orchestrator-проект → `repo=orchestrator`,
|
||||||
|
`ORCH-*`; enduro-проект → `repo=enduro-trails`, `ET-*`; независимые префиксы (`ORCH-001`/`ORCH-002`
|
||||||
|
параллельно с `ET-001`).
|
||||||
|
|
||||||
|
**Прогон (в контейнере, образ `orchestrator-orchestrator`):** `57 passed`. 9 падений в
|
||||||
|
`tests/test_webhooks.py` — **pre-existing** (webhook signature 401 / TypeError, не связаны с ORCH-6,
|
||||||
|
не трогались).
|
||||||
|
|
||||||
|
```bash
|
||||||
|
IMG=$(docker inspect orchestrator --format '{{.Config.Image}}')
|
||||||
|
docker run --rm -v /home/slin/repos/orchestrator:/code -w /code --entrypoint python3 $IMG -m pytest tests/ -q
|
||||||
|
```
|
||||||
|
|
||||||
|
### Проверка резолва (offline, в работающем контейнере)
|
||||||
|
|
||||||
|
```bash
|
||||||
|
docker exec orchestrator python3 -c "
|
||||||
|
from src.projects import get_project_by_plane_id, known_plane_project_ids
|
||||||
|
o = get_project_by_plane_id('8da6aa25-a60e-44d6-a1e2-d8ae59aa7d6a')
|
||||||
|
e = get_project_by_plane_id('7a79f0a9-5278-49cd-9007-9a338f238f9c')
|
||||||
|
assert o.repo=='orchestrator' and o.work_item_prefix=='ORCH'
|
||||||
|
assert e.repo=='enduro-trails' and e.work_item_prefix=='ET'
|
||||||
|
assert get_project_by_plane_id('00000000-0000-0000-0000-000000000000') is None
|
||||||
|
print('RESOLVE OK:', o.repo, e.repo, '| known:', len(known_plane_project_ids()))
|
||||||
|
"
|
||||||
|
```
|
||||||
|
|
||||||
|
### ⚠️ Важно
|
||||||
|
|
||||||
|
- Plane-webhook **остаётся выключенным** (`is_active=false`). Включение — отдельный
|
||||||
|
шаг Стрим после ревью PR.
|
||||||
|
- `ORCH_PROJECTS_JSON` (если задан) **полностью заменяет** дефолт — перечислять все нужные проекты.
|
||||||
|
- Обратная совместимость `plane_sync` сохранена (дефолт project_id = enduro), ET-задачи не сломаны.
|
||||||
|
|
||||||
|
### Re-enable webhook (после ревью, делает Стрим)
|
||||||
|
|
||||||
|
```sql
|
||||||
|
UPDATE webhooks SET is_active=true WHERE id='93f0c342-a614-4248-9d0f-c107276f5620';
|
||||||
|
```
|
||||||
7
docs/INCIDENT_2026-06-02_webhook_autorun.txt
Normal file
7
docs/INCIDENT_2026-06-02_webhook_autorun.txt
Normal file
@@ -0,0 +1,7 @@
|
|||||||
|
INCIDENT 2026-06-02: Plane webhook auto-triggered pipeline for ALL ORCH-1..7 tasks
|
||||||
|
- Plane webhook (id 93f0c342) fires on ANY issue creation in workspace, no project filter
|
||||||
|
- plane.py:91 hardcodes repo=settings.default_repo (enduro-trails)
|
||||||
|
- Result: ORCH-x tasks ran analyst/architect in WRONG repo (enduro-trails), created junk ET-010..016
|
||||||
|
- MITIGATION: Plane webhook DEACTIVATED (is_active=false) until ORCH-6 adds project filter
|
||||||
|
- ROOT FIX = ORCH-6 (multi-repo): filter by plane_project_id + repo mapping per project
|
||||||
|
- To re-enable webhook after ORCH-6: UPDATE webhooks SET is_active=true WHERE id=93f0c342...
|
||||||
127
docs/ORCH-1_JOB_QUEUE.md
Normal file
127
docs/ORCH-1_JOB_QUEUE.md
Normal file
@@ -0,0 +1,127 @@
|
|||||||
|
# ORCH-1 (F-2b): Persistent Job Queue
|
||||||
|
|
||||||
|
**Дата:** 2026-06-02
|
||||||
|
**Ветка:** `feature/ORCH-1-job-queue`
|
||||||
|
**Источник:** AUDIT_2026-06-02 (B-2 / F-2b)
|
||||||
|
|
||||||
|
## Проблема
|
||||||
|
|
||||||
|
Агенты запускались **in-process**: `launcher.launch()` синхронно спавнил
|
||||||
|
`subprocess.Popen` + 2 daemon-thread (`_watchdog`, `_monitor_agent`) прямо в
|
||||||
|
процессе uvicorn, из **8 webhook-точек**. Последствия:
|
||||||
|
|
||||||
|
- **Рестарт = катастрофа.** daemon-threads умирают, claude-процессы → сироты,
|
||||||
|
работа теряется (M-1 лишь помечал `exit=-1` и звал человека).
|
||||||
|
- **Нет лимита параллелизма** — N webhook'ов = N одновременных claude.
|
||||||
|
- **Нет ретраев** — упавший агент просто мёртв.
|
||||||
|
|
||||||
|
## Решение
|
||||||
|
|
||||||
|
Персистентная очередь задач (SQLite-таблица `jobs`) + фоновый воркер:
|
||||||
|
|
||||||
|
1. Webhook-хэндлер кладёт job (`enqueue_job`) → мгновенный ответ 200.
|
||||||
|
2. Фоновый воркер (`src/queue_worker.py`, отдельный daemon-thread) забирает
|
||||||
|
jobs с учётом `max_concurrency` (`claim_next_job`, атомарно) и спавнит агента
|
||||||
|
(`launcher.launch_job`, та же Popen-логика).
|
||||||
|
3. По завершении `_monitor_agent` → `_finalize_job`:
|
||||||
|
- `exit 0` → `done`;
|
||||||
|
- `exit != 0` & `attempts < max_attempts` → requeue (`queued`);
|
||||||
|
- `exit != 0` & `attempts >= max_attempts` → `failed` + Telegram.
|
||||||
|
|
||||||
|
## Что изменено
|
||||||
|
|
||||||
|
| Файл | Изменение |
|
||||||
|
|------|-----------|
|
||||||
|
| `src/db.py` | Таблица `jobs` + индекс; хелперы `enqueue_job`, `claim_next_job` (атомарный), `mark_job`, `count_running_jobs`, `requeue_running_jobs`, `get_job`, `job_status_counts`, `recent_jobs` |
|
||||||
|
| `src/config.py` | `max_concurrency` (env `ORCH_MAX_CONCURRENCY`, default 1), `queue_poll_interval` (env `ORCH_QUEUE_POLL_INTERVAL`, default 2.0) |
|
||||||
|
| `src/agents/launcher.py` | `launch()` → тонкая обёртка над `_spawn()`; новый `launch_job(job)`; `_spawn()` (общий, `job_id` опционально); monitor/watchdog принимают `job_id`; новый `_finalize_job()` (статусы + ретраи). 4 внутренних advance-вызова `self.launch` → `enqueue_job` |
|
||||||
|
| `src/webhooks/plane.py` | 4 точки `launcher.launch` → `enqueue_job` |
|
||||||
|
| `src/webhooks/gitea.py` | 4 точки `launcher.launch` → `enqueue_job` |
|
||||||
|
| `src/queue_worker.py` | **НОВЫЙ** — `QueueWorker` (drain loop + max_concurrency + graceful stop) |
|
||||||
|
| `src/main.py` | lifespan: queue-recovery (`requeue_running_jobs`) после M-1, старт/останов воркера; новый `GET /queue` |
|
||||||
|
| `tests/test_queue.py` | **НОВЫЙ** — 19 тестов (lifecycle, атомарность claim, ретраи, requeue, observability, worker max_concurrency; Popen полностью замокан) |
|
||||||
|
|
||||||
|
## Атомарность claim
|
||||||
|
|
||||||
|
```sql
|
||||||
|
SELECT id FROM jobs WHERE status='queued' ORDER BY id LIMIT 1;
|
||||||
|
UPDATE jobs SET status='running', attempts=attempts+1, started_at=datetime('now')
|
||||||
|
WHERE id=? AND status='queued'; -- rowcount==1 => claimed, ==0 => проиграл гонку
|
||||||
|
```
|
||||||
|
|
||||||
|
Гарантия: один job не выдаётся дважды даже при параллельных тиках воркера
|
||||||
|
(проверено `test_concurrent_claims_no_duplicate` — 8 потоков, 20 jobs).
|
||||||
|
|
||||||
|
## Сохранённые фиксы (НЕ сломаны)
|
||||||
|
|
||||||
|
- **B-1** task-file write (direct `open()` в worktree) — без изменений.
|
||||||
|
- **B-2** Popen → log_fh (no PIPE), monitor reap — без изменений, только обёрнут.
|
||||||
|
- **M-1** orphan-recovery в `main.py` — оставлен, queue-recovery добавлен ПОСЛЕ него.
|
||||||
|
- **ORCH-2** worktree per task — без изменений.
|
||||||
|
- **ORCH-6** project registry/filter — без изменений.
|
||||||
|
|
||||||
|
## Acceptance
|
||||||
|
|
||||||
|
| # | Проверка | Статус |
|
||||||
|
|---|----------|--------|
|
||||||
|
| 1 | webhook кладёт job (queued) | ✅ enqueue_job |
|
||||||
|
| 2 | воркер исполняет queued→running→done | ✅ worker + _finalize_job |
|
||||||
|
| 3 | running ≤ max_concurrency | ✅ test_worker_respects_max_concurrency |
|
||||||
|
| 4 | ретрай fail→queued→failed+notify | ✅ test_finalize_job_requeue_then_fail |
|
||||||
|
| 5 | рестарт-safe (running→requeue) | ✅ requeue_running_jobs + lifespan |
|
||||||
|
| 6 | M-1 не сломан | ✅ оставлен в lifespan |
|
||||||
|
| 7 | тесты (new green, 9 pre-existing) | ✅ 76 passed / 9 pre-existing |
|
||||||
|
| 8 | `/queue` | ✅ counts + recent |
|
||||||
|
|
||||||
|
## Тесты
|
||||||
|
|
||||||
|
```bash
|
||||||
|
IMG=$(docker inspect orchestrator --format '{{.Config.Image}}')
|
||||||
|
docker run --rm -v /home/slin/repos/orchestrator:/code -w /code \
|
||||||
|
--entrypoint python3 $IMG -m pytest tests/ -q
|
||||||
|
# 110 passed, 9 failed (pre-existing test_webhooks 401/signature/TypeError)
|
||||||
|
```
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## Resilience-слой (ДОПОЛНЕНИЕ: preflight + 429 + backoff + circuit breaker)
|
||||||
|
|
||||||
|
Надёжность очереди против недоступности CLI и rate-limit. Два РАЗНЫХ класса
|
||||||
|
проблем лечатся по-разному.
|
||||||
|
|
||||||
|
### A. Дешёвый preflight (`src/preflight.py`) — не жжёт токены
|
||||||
|
Перед claim воркер проверяет: `os.path.exists(CLAUDE_BIN)` + `claude --version`
|
||||||
|
(timeout 5с, токены НЕ тратит). Результат кэшируется `preflight_cache_ttl` (45с).
|
||||||
|
FAIL → воркер НЕ claim’ит (job остаётся `queued`), ждёт. 🚫 НЕТ prompt-ping.
|
||||||
|
|
||||||
|
### B. 429 — детект НА ВЫХОДЕ (`src/error_classifier.py`)
|
||||||
|
rate-limit нельзя предсказать — классифицируем по логу прогона. `classify_log_file`
|
||||||
|
читает хвост лога (16KB), ищет `429/rate limit/overloaded/quota/503/529/timeout/...`
|
||||||
|
→ `transient` или `permanent`. Извлекает `Retry-After`.
|
||||||
|
|
||||||
|
- **transient** (429/сеть) → backoff-ретрай с ОТДЕЛЬНЫМ `transient_attempts`
|
||||||
|
(лимит `transient_max_attempts=5`) — не жжёт code-fault бюджет.
|
||||||
|
- **permanent** (code-fault) → обычные `attempts < max_attempts` (2), потом `failed`.
|
||||||
|
|
||||||
|
### C. Backoff + `available_at`
|
||||||
|
Колонки `jobs.available_at TEXT` + `jobs.transient_attempts INTEGER` (миграция
|
||||||
|
`_ensure_column`). `claim_next_job`: `WHERE status='queued' AND (available_at IS NULL
|
||||||
|
OR available_at <= datetime('now'))`. При transient: `available_at = now +
|
||||||
|
min(2^n * base, max)` (base=10с, max=600с), `Retry-After` уважается (берёмся max).
|
||||||
|
|
||||||
|
### D. Circuit breaker (`CircuitBreaker` в queue_worker)
|
||||||
|
N=3 transient подряд → **open**: воркер паузит `breaker_pause_seconds=300`, ВООБЩЕ
|
||||||
|
не дёргает CLI, Telegram-алерт. Через паузу → **half-open** (пробует 1 job);
|
||||||
|
ожил (exit 0) → **closed**; снова transient → опять open. Состояние в памяти
|
||||||
|
воркера, отражается в `/queue.resilience`.
|
||||||
|
Связь launcher→breaker — через callback `launcher.on_outcome` (без import-цикла).
|
||||||
|
|
||||||
|
### Конфиг (config.py)
|
||||||
|
`preflight_cache_ttl=45`, `backoff_base_seconds=10`, `backoff_max_seconds=600`,
|
||||||
|
`transient_max_attempts=5`, `breaker_threshold=3`, `breaker_pause_seconds=300`.
|
||||||
|
|
||||||
|
### Тесты
|
||||||
|
`tests/test_resilience.py` — 34 теста: preflight (FAIL→queued, кэш, force),
|
||||||
|
классификатор (transient/permanent/Retry-After), backoff (рост/cap/Retry-After,
|
||||||
|
`available_at` гейтинг), launcher transient/permanent finalize, breaker
|
||||||
|
(open/half-open/closed/re-open, блок claim).
|
||||||
@@ -4,7 +4,7 @@ import logging
|
|||||||
import threading
|
import threading
|
||||||
import signal
|
import signal
|
||||||
from ..config import settings
|
from ..config import settings
|
||||||
from ..db import get_db, get_task_by_repo_branch, update_task_stage
|
from ..db import get_db, get_task_by_repo_branch, update_task_stage, enqueue_job
|
||||||
from ..stages import get_next_stage, get_qg_for_stage, get_agent_for_stage
|
from ..stages import get_next_stage, get_qg_for_stage, get_agent_for_stage
|
||||||
from ..git_worktree import ensure_worktree, get_worktree_path
|
from ..git_worktree import ensure_worktree, get_worktree_path
|
||||||
from ..qg.checks import QG_CHECKS
|
from ..qg.checks import QG_CHECKS
|
||||||
@@ -57,7 +57,10 @@ class AgentLauncher:
|
|||||||
|
|
||||||
def launch(self, agent: str, repo: str, task_content: str = None, task_id: int = None) -> int:
|
def launch(self, agent: str, repo: str, task_content: str = None, task_id: int = None) -> int:
|
||||||
"""
|
"""
|
||||||
Launch a Claude CLI agent.
|
Launch a Claude CLI agent directly (legacy synchronous path).
|
||||||
|
|
||||||
|
Kept for backward compatibility (direct callers / existing tests). The
|
||||||
|
ORCH-1 job queue uses launch_job() instead, but both share _spawn().
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
agent: Agent role (analyst, architect, developer, reviewer, tester)
|
agent: Agent role (analyst, architect, developer, reviewer, tester)
|
||||||
@@ -68,6 +71,31 @@ class AgentLauncher:
|
|||||||
Returns:
|
Returns:
|
||||||
agent_run_id from DB
|
agent_run_id from DB
|
||||||
"""
|
"""
|
||||||
|
return self._spawn(agent, repo, task_content, task_id, job_id=None)
|
||||||
|
|
||||||
|
def launch_job(self, job: dict) -> int:
|
||||||
|
"""ORCH-1: launch an agent for a claimed queue job.
|
||||||
|
|
||||||
|
Same spawn path as launch(), but threads job['id'] through so the monitor
|
||||||
|
can update the job's status (done / requeue / failed) and link jobs.run_id
|
||||||
|
to the agent_runs row. Returns the agent_run_id.
|
||||||
|
"""
|
||||||
|
return self._spawn(
|
||||||
|
job["agent"],
|
||||||
|
job["repo"],
|
||||||
|
job.get("task_content"),
|
||||||
|
job.get("task_id"),
|
||||||
|
job_id=job["id"],
|
||||||
|
)
|
||||||
|
|
||||||
|
def _spawn(self, agent: str, repo: str, task_content: str = None,
|
||||||
|
task_id: int = None, job_id: int = None) -> int:
|
||||||
|
"""Shared spawn implementation for launch() and launch_job().
|
||||||
|
|
||||||
|
When job_id is set, the monitor/watchdog drive the jobs table status
|
||||||
|
(ORCH-1). The claude-CLI Popen logic (B-2) and worktree/task-file logic
|
||||||
|
(B-1 / ORCH-2) are unchanged.
|
||||||
|
"""
|
||||||
config = self.AGENT_CONFIGS.get(agent)
|
config = self.AGENT_CONFIGS.get(agent)
|
||||||
if not config:
|
if not config:
|
||||||
raise ValueError(f"Unknown agent: {agent}")
|
raise ValueError(f"Unknown agent: {agent}")
|
||||||
@@ -98,6 +126,14 @@ class AgentLauncher:
|
|||||||
run_id = cursor.lastrowid
|
run_id = cursor.lastrowid
|
||||||
conn.commit()
|
conn.commit()
|
||||||
|
|
||||||
|
# ORCH-1: link this job to the agent_runs row and stamp started_at.
|
||||||
|
if job_id is not None:
|
||||||
|
conn.execute(
|
||||||
|
"UPDATE jobs SET run_id = ?, started_at = datetime('now') WHERE id = ?",
|
||||||
|
(run_id, job_id),
|
||||||
|
)
|
||||||
|
conn.commit()
|
||||||
|
|
||||||
# Prepare output log path
|
# Prepare output log path
|
||||||
output_path = f"/app/data/runs/{run_id}.log"
|
output_path = f"/app/data/runs/{run_id}.log"
|
||||||
os.makedirs(os.path.dirname(output_path), exist_ok=True)
|
os.makedirs(os.path.dirname(output_path), exist_ok=True)
|
||||||
@@ -154,6 +190,7 @@ class AgentLauncher:
|
|||||||
t = threading.Thread(
|
t = threading.Thread(
|
||||||
target=self._watchdog,
|
target=self._watchdog,
|
||||||
args=(proc.pid, run_id),
|
args=(proc.pid, run_id),
|
||||||
|
kwargs={"job_id": job_id},
|
||||||
daemon=True,
|
daemon=True,
|
||||||
)
|
)
|
||||||
t.start()
|
t.start()
|
||||||
@@ -163,6 +200,7 @@ class AgentLauncher:
|
|||||||
m = threading.Thread(
|
m = threading.Thread(
|
||||||
target=self._monitor_agent,
|
target=self._monitor_agent,
|
||||||
args=(proc, run_id, agent, repo, agent_branch, output_path, log_fh),
|
args=(proc, run_id, agent, repo, agent_branch, output_path, log_fh),
|
||||||
|
kwargs={"job_id": job_id},
|
||||||
daemon=True,
|
daemon=True,
|
||||||
)
|
)
|
||||||
m.start()
|
m.start()
|
||||||
@@ -171,8 +209,13 @@ class AgentLauncher:
|
|||||||
notify_agent_started(run_id, agent, task_id)
|
notify_agent_started(run_id, agent, task_id)
|
||||||
return run_id
|
return run_id
|
||||||
|
|
||||||
def _watchdog(self, pid: int, run_id: int, timeout: int = None):
|
def _watchdog(self, pid: int, run_id: int, timeout: int = None, job_id: int = None):
|
||||||
"""Kill agent if it exceeds timeout."""
|
"""Kill agent if it exceeds timeout.
|
||||||
|
|
||||||
|
ORCH-1: on a timeout-kill the monitor's proc.wait() returns the kill exit
|
||||||
|
code and drives the job retry/fail logic, so the watchdog itself only needs
|
||||||
|
to SIGKILL and record the agent_runs exit. job_id is accepted for symmetry.
|
||||||
|
"""
|
||||||
import time
|
import time
|
||||||
if timeout is None:
|
if timeout is None:
|
||||||
timeout = self.AGENT_TIMEOUT
|
timeout = self.AGENT_TIMEOUT
|
||||||
@@ -190,7 +233,7 @@ class AgentLauncher:
|
|||||||
except ProcessLookupError:
|
except ProcessLookupError:
|
||||||
pass # Already finished
|
pass # Already finished
|
||||||
|
|
||||||
def _monitor_agent(self, proc, run_id, agent, repo, branch, output_path=None, log_fh=None):
|
def _monitor_agent(self, proc, run_id, agent, repo, branch, output_path=None, log_fh=None, job_id=None):
|
||||||
"""Wait for agent to finish, commit+push results, update DB.
|
"""Wait for agent to finish, commit+push results, update DB.
|
||||||
|
|
||||||
B-2 fix: stdout already goes straight to the log file via Popen, so we just
|
B-2 fix: stdout already goes straight to the log file via Popen, so we just
|
||||||
@@ -318,6 +361,132 @@ class AgentLauncher:
|
|||||||
if exit_code == 0:
|
if exit_code == 0:
|
||||||
self._try_advance_stage(run_id, agent, repo, branch)
|
self._try_advance_stage(run_id, agent, repo, branch)
|
||||||
|
|
||||||
|
# ORCH-1: drive the job-queue status for queue-launched jobs only.
|
||||||
|
# (Legacy direct launch() has job_id=None and is unaffected.)
|
||||||
|
if job_id is not None:
|
||||||
|
self._finalize_job(job_id, agent, run_id, exit_code, output_path=output_path)
|
||||||
|
|
||||||
|
def _backoff_seconds(self, transient_attempts: int, retry_after: int = None) -> int:
|
||||||
|
"""Exponential backoff for transient failures, honouring Retry-After.
|
||||||
|
|
||||||
|
backoff = min(2^transient_attempts * base, max). If the server sent a
|
||||||
|
Retry-After, use the larger of the two (never poll sooner than asked).
|
||||||
|
"""
|
||||||
|
base = settings.backoff_base_seconds
|
||||||
|
cap = settings.backoff_max_seconds
|
||||||
|
backoff = min((2 ** max(transient_attempts, 0)) * base, cap)
|
||||||
|
if retry_after is not None and retry_after > 0:
|
||||||
|
backoff = max(backoff, min(retry_after, cap))
|
||||||
|
return int(backoff)
|
||||||
|
|
||||||
|
def _finalize_job(self, job_id: int, agent: str, run_id: int, exit_code, output_path=None):
|
||||||
|
"""ORCH-1: update the jobs row after the agent process finished.
|
||||||
|
|
||||||
|
exit_code == 0 -> done (and resets the breaker streak via on_outcome).
|
||||||
|
exit_code != 0 -> classify the failure from the run log tail (token-free):
|
||||||
|
- TRANSIENT (429/overload/network): backoff-requeue with available_at in
|
||||||
|
the future + a SEPARATE transient_attempts budget
|
||||||
|
(settings.transient_max_attempts), honouring Retry-After. Reported to
|
||||||
|
the breaker so it opens after N consecutive transient failures.
|
||||||
|
- PERMANENT (code fault): ordinary attempts < max_attempts requeue,
|
||||||
|
otherwise 'failed' + Telegram.
|
||||||
|
"""
|
||||||
|
from ..db import get_job, mark_job
|
||||||
|
from ..error_classifier import classify_log_file
|
||||||
|
try:
|
||||||
|
job = get_job(job_id)
|
||||||
|
if not job:
|
||||||
|
return
|
||||||
|
if exit_code == 0:
|
||||||
|
mark_job(job_id, "done", run_id=run_id)
|
||||||
|
logger.info(f"Job {job_id} ({agent}) done (run_id={run_id})")
|
||||||
|
self._record_outcome(transient=False, recovered=True)
|
||||||
|
return
|
||||||
|
|
||||||
|
# Classify the failure from the agent log tail (no token cost).
|
||||||
|
kind, retry_after = "permanent", None
|
||||||
|
log_path = output_path or f"/app/data/runs/{run_id}.log"
|
||||||
|
try:
|
||||||
|
kind, retry_after = classify_log_file(log_path)
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
|
||||||
|
if kind == "transient":
|
||||||
|
self._finalize_transient(job_id, agent, run_id, exit_code, job, retry_after)
|
||||||
|
else:
|
||||||
|
self._finalize_permanent(job_id, agent, run_id, exit_code, job)
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Job {job_id}: _finalize_job error: {e}")
|
||||||
|
|
||||||
|
def _finalize_transient(self, job_id, agent, run_id, exit_code, job, retry_after):
|
||||||
|
"""Transient (429/overload/net) failure -> backoff requeue or fail when budget out."""
|
||||||
|
from ..db import mark_job, mark_job_transient
|
||||||
|
tattempts = job.get("transient_attempts", 0)
|
||||||
|
tmax = settings.transient_max_attempts
|
||||||
|
err = (f"transient (429/overload) agent {agent} exit={exit_code} "
|
||||||
|
f"(run_id={run_id}); retry_after={retry_after}")
|
||||||
|
self._record_outcome(transient=True, recovered=False)
|
||||||
|
if tattempts < tmax:
|
||||||
|
backoff = self._backoff_seconds(tattempts + 1, retry_after)
|
||||||
|
mark_job_transient(job_id, backoff, error=err)
|
||||||
|
logger.warning(
|
||||||
|
f"Job {job_id} ({agent}) TRANSIENT fail (exit={exit_code}), "
|
||||||
|
f"backoff {backoff}s, transient_attempt {tattempts + 1}/{tmax}"
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
mark_job(job_id, "failed", run_id=run_id, error=err)
|
||||||
|
logger.error(
|
||||||
|
f"Job {job_id} ({agent}) failed after {tattempts} transient attempts"
|
||||||
|
)
|
||||||
|
self._notify_failed(job_id, agent, job, run_id,
|
||||||
|
f"transient (rate-limit) after {tattempts} attempts")
|
||||||
|
|
||||||
|
def _finalize_permanent(self, job_id, agent, run_id, exit_code, job):
|
||||||
|
"""Permanent (code-fault) failure -> normal attempts<max requeue, then fail."""
|
||||||
|
from ..db import mark_job
|
||||||
|
attempts = job.get("attempts", 0)
|
||||||
|
max_attempts = job.get("max_attempts", 2)
|
||||||
|
err = f"agent {agent} exit_code={exit_code} (run_id={run_id})"
|
||||||
|
self._record_outcome(transient=False, recovered=False)
|
||||||
|
if attempts < max_attempts:
|
||||||
|
mark_job(job_id, "queued", run_id=run_id, error=err)
|
||||||
|
logger.warning(
|
||||||
|
f"Job {job_id} ({agent}) failed (exit={exit_code}), "
|
||||||
|
f"requeued (attempt {attempts}/{max_attempts})"
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
mark_job(job_id, "failed", run_id=run_id, error=err)
|
||||||
|
logger.error(
|
||||||
|
f"Job {job_id} ({agent}) failed permanently after "
|
||||||
|
f"{attempts} attempts (exit={exit_code})"
|
||||||
|
)
|
||||||
|
self._notify_failed(job_id, agent, job, run_id,
|
||||||
|
f"{attempts} attempts (exit={exit_code})")
|
||||||
|
|
||||||
|
def _notify_failed(self, job_id, agent, job, run_id, why):
|
||||||
|
try:
|
||||||
|
from ..notifications import send_telegram
|
||||||
|
send_telegram(
|
||||||
|
f"\U0001f6a8 Job {job_id} ({agent}, repo {job.get('repo')}) "
|
||||||
|
f"failed: {why}. Logs: /app/data/runs/{run_id}.log"
|
||||||
|
)
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
|
||||||
|
def _record_outcome(self, transient: bool, recovered: bool):
|
||||||
|
"""Forward the run outcome to the circuit breaker (if a worker is wired).
|
||||||
|
|
||||||
|
Decoupled via a settable callback (set by QueueWorker.start) so the launcher
|
||||||
|
does not hard-import the worker (avoids a cycle) and tests can run the
|
||||||
|
launcher standalone.
|
||||||
|
"""
|
||||||
|
cb = getattr(self, "on_outcome", None)
|
||||||
|
if cb:
|
||||||
|
try:
|
||||||
|
cb(transient=transient, recovered=recovered)
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
|
||||||
def _try_advance_stage(self, run_id: int, agent: str, repo: str, branch: str):
|
def _try_advance_stage(self, run_id: int, agent: str, repo: str, branch: str):
|
||||||
"""After agent finishes successfully, check QG and advance stage if possible."""
|
"""After agent finishes successfully, check QG and advance stage if possible."""
|
||||||
try:
|
try:
|
||||||
@@ -416,8 +585,8 @@ class AgentLauncher:
|
|||||||
f"(attempt {retry_count+1}/3). Fix findings in "
|
f"(attempt {retry_count+1}/3). Fix findings in "
|
||||||
f"docs/work-items/{work_item_id}/12-review.md"
|
f"docs/work-items/{work_item_id}/12-review.md"
|
||||||
)
|
)
|
||||||
new_run = self.launch("developer", repo, task_desc, task_id=task_id)
|
new_job = enqueue_job("developer", repo, task_desc, task_id=task_id)
|
||||||
logger.info(f"Task {task_id}: reviewer REQUEST_CHANGES, relaunched developer (run_id={new_run})")
|
logger.info(f"Task {task_id}: reviewer REQUEST_CHANGES, enqueued developer (job_id={new_job})")
|
||||||
else:
|
else:
|
||||||
from ..notifications import send_telegram
|
from ..notifications import send_telegram
|
||||||
send_telegram(f"\u26a0\ufe0f {work_item_id}: Max developer retries (3) reached. Manual intervention needed.")
|
send_telegram(f"\u26a0\ufe0f {work_item_id}: Max developer retries (3) reached. Manual intervention needed.")
|
||||||
@@ -446,8 +615,8 @@ class AgentLauncher:
|
|||||||
f"Stage: development\nNote: Tests FAILED. "
|
f"Stage: development\nNote: Tests FAILED. "
|
||||||
f"Fix failures described in docs/work-items/{work_item_id}/13-test-report.md"
|
f"Fix failures described in docs/work-items/{work_item_id}/13-test-report.md"
|
||||||
)
|
)
|
||||||
new_run = self.launch("developer", repo, task_desc, task_id=task_id)
|
new_job = enqueue_job("developer", repo, task_desc, task_id=task_id)
|
||||||
logger.info(f"Task {task_id}: tester FAIL, relaunched developer (run_id={new_run})")
|
logger.info(f"Task {task_id}: tester FAIL, enqueued developer (job_id={new_job})")
|
||||||
else:
|
else:
|
||||||
from ..notifications import send_telegram
|
from ..notifications import send_telegram
|
||||||
from ..plane_sync import set_issue_blocked
|
from ..plane_sync import set_issue_blocked
|
||||||
@@ -478,8 +647,8 @@ class AgentLauncher:
|
|||||||
f"Stage: analysis\nNote: Architect conflict. Revise TRZ. "
|
f"Stage: analysis\nNote: Architect conflict. Revise TRZ. "
|
||||||
f"See docs/work-items/{work_item_id}/10-conflict.md"
|
f"See docs/work-items/{work_item_id}/10-conflict.md"
|
||||||
)
|
)
|
||||||
new_run = self.launch("analyst", repo, task_desc, task_id=task_id)
|
new_job = enqueue_job("analyst", repo, task_desc, task_id=task_id)
|
||||||
logger.info(f"Task {task_id}: architect conflict, relaunched analyst")
|
logger.info(f"Task {task_id}: architect conflict, enqueued analyst (job_id={new_job})")
|
||||||
return
|
return
|
||||||
|
|
||||||
return
|
return
|
||||||
@@ -496,8 +665,8 @@ class AgentLauncher:
|
|||||||
next_agent = get_agent_for_stage(next_stage)
|
next_agent = get_agent_for_stage(next_stage)
|
||||||
if next_agent:
|
if next_agent:
|
||||||
task_desc = f"Work item: {work_item_id}\nRepo: {repo}\nBranch: {branch}\nStage: {next_stage}"
|
task_desc = f"Work item: {work_item_id}\nRepo: {repo}\nBranch: {branch}\nStage: {next_stage}"
|
||||||
new_run_id = self.launch(next_agent, repo, task_desc, task_id=task_id)
|
new_job_id = enqueue_job(next_agent, repo, task_desc, task_id=task_id)
|
||||||
logger.info(f"Task {task_id}: launched '{next_agent}' (run_id={new_run_id})")
|
logger.info(f"Task {task_id}: enqueued '{next_agent}' (job_id={new_job_id})")
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f"Auto-advance failed for run_id={run_id}: {e}")
|
logger.error(f"Auto-advance failed for run_id={run_id}: {e}")
|
||||||
|
|||||||
@@ -16,6 +16,11 @@ class Settings(BaseSettings):
|
|||||||
gitea_owner: str = "admin"
|
gitea_owner: str = "admin"
|
||||||
default_repo: str = "enduro-trails"
|
default_repo: str = "enduro-trails"
|
||||||
|
|
||||||
|
# ORCH-6: multi-repo project registry. JSON array of
|
||||||
|
# {plane_project_id, repo, work_item_prefix, name}.
|
||||||
|
# Empty -> built-in default registry in src/projects.py.
|
||||||
|
projects_json: str = ""
|
||||||
|
|
||||||
# Claude CLI
|
# Claude CLI
|
||||||
claude_bin: str = "/opt/claude-code/bin/claude.exe"
|
claude_bin: str = "/opt/claude-code/bin/claude.exe"
|
||||||
repos_dir: str = "/repos"
|
repos_dir: str = "/repos"
|
||||||
@@ -25,6 +30,29 @@ class Settings(BaseSettings):
|
|||||||
# DB
|
# DB
|
||||||
db_path: str = "/app/data/orchestrator.db"
|
db_path: str = "/app/data/orchestrator.db"
|
||||||
|
|
||||||
|
# ORCH-1 (F-2b): persistent job queue / background worker.
|
||||||
|
# max_concurrency -> max agent jobs running in parallel (env ORCH_MAX_CONCURRENCY)
|
||||||
|
# queue_poll_interval -> worker loop poll seconds (env ORCH_QUEUE_POLL_INTERVAL)
|
||||||
|
max_concurrency: int = 1
|
||||||
|
queue_poll_interval: float = 2.0
|
||||||
|
|
||||||
|
# ORCH-1b (resilience): preflight + 429/rate-limit + backoff + circuit breaker.
|
||||||
|
# preflight_cache_ttl -> cache the cheap CLI/network preflight result (seconds);
|
||||||
|
# the worker does NOT re-run `claude --version` more often
|
||||||
|
# than this (env ORCH_PREFLIGHT_CACHE_TTL).
|
||||||
|
# backoff_base_seconds -> base for exponential transient backoff.
|
||||||
|
# backoff_max_seconds -> ceiling for the transient backoff.
|
||||||
|
# transient_max_attempts -> retry budget for transient (429/overload/network)
|
||||||
|
# failures, separate from code-fault `attempts`.
|
||||||
|
# breaker_threshold -> consecutive transient failures that OPEN the breaker.
|
||||||
|
# breaker_pause_seconds -> how long the breaker stays open before half-open.
|
||||||
|
preflight_cache_ttl: int = 45
|
||||||
|
backoff_base_seconds: int = 10
|
||||||
|
backoff_max_seconds: int = 600
|
||||||
|
transient_max_attempts: int = 5
|
||||||
|
breaker_threshold: int = 3
|
||||||
|
breaker_pause_seconds: int = 300
|
||||||
|
|
||||||
|
|
||||||
# Telegram notifications
|
# Telegram notifications
|
||||||
telegram_bot_token: str = ""
|
telegram_bot_token: str = ""
|
||||||
|
|||||||
288
src/db.py
288
src/db.py
@@ -40,10 +40,44 @@ def init_db():
|
|||||||
exit_code INTEGER,
|
exit_code INTEGER,
|
||||||
output_path TEXT
|
output_path TEXT
|
||||||
);
|
);
|
||||||
|
-- ORCH-1 (F-2b): persistent job queue. Webhook handlers enqueue a job and
|
||||||
|
-- return immediately; a background worker claims jobs (respecting
|
||||||
|
-- max_concurrency), spawns the claude agent, and updates the status.
|
||||||
|
-- Restart-safe: running jobs are requeued on startup (queue-recovery).
|
||||||
|
CREATE TABLE IF NOT EXISTS jobs (
|
||||||
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||||
|
agent TEXT NOT NULL,
|
||||||
|
repo TEXT NOT NULL,
|
||||||
|
task_id INTEGER, -- FK tasks.id (nullable)
|
||||||
|
task_content TEXT, -- written to the agent task_file
|
||||||
|
status TEXT NOT NULL DEFAULT 'queued', -- queued|running|done|failed
|
||||||
|
attempts INTEGER NOT NULL DEFAULT 0,
|
||||||
|
max_attempts INTEGER NOT NULL DEFAULT 2,
|
||||||
|
run_id INTEGER, -- agent_runs.id once started
|
||||||
|
error TEXT, -- last error message
|
||||||
|
transient_attempts INTEGER NOT NULL DEFAULT 0, -- ORCH-1 resilience: 429/transient retries
|
||||||
|
available_at TEXT, -- ORCH-1 resilience: backoff gate (claim when <= now)
|
||||||
|
created_at TEXT DEFAULT (datetime('now')),
|
||||||
|
started_at TEXT,
|
||||||
|
finished_at TEXT
|
||||||
|
);
|
||||||
|
CREATE INDEX IF NOT EXISTS idx_jobs_status ON jobs(status, id);
|
||||||
""")
|
""")
|
||||||
|
# Lightweight migration: add resilience columns to a pre-existing jobs table
|
||||||
|
# (CREATE TABLE IF NOT EXISTS won't add columns to an already-created table).
|
||||||
|
_ensure_column(conn, "jobs", "transient_attempts", "INTEGER NOT NULL DEFAULT 0")
|
||||||
|
_ensure_column(conn, "jobs", "available_at", "TEXT")
|
||||||
conn.close()
|
conn.close()
|
||||||
|
|
||||||
|
|
||||||
|
def _ensure_column(conn, table: str, column: str, decl: str):
|
||||||
|
"""Add a column to `table` if it does not already exist (idempotent migration)."""
|
||||||
|
cols = [r[1] for r in conn.execute(f"PRAGMA table_info({table})").fetchall()]
|
||||||
|
if column not in cols:
|
||||||
|
conn.execute(f"ALTER TABLE {table} ADD COLUMN {column} {decl}")
|
||||||
|
conn.commit()
|
||||||
|
|
||||||
|
|
||||||
def get_task_by_plane_id(plane_id: str) -> dict | None:
|
def get_task_by_plane_id(plane_id: str) -> dict | None:
|
||||||
"""Find task by Plane work item ID (checks plane_id and plane_issue_id)."""
|
"""Find task by Plane work item ID (checks plane_id and plane_issue_id)."""
|
||||||
conn = get_db()
|
conn = get_db()
|
||||||
@@ -79,21 +113,261 @@ def update_task_stage(task_id: int, stage: str):
|
|||||||
conn.close()
|
conn.close()
|
||||||
|
|
||||||
|
|
||||||
def get_next_work_item_id(repo: str) -> str:
|
def get_next_work_item_id(repo: str, prefix: str = "ET") -> str:
|
||||||
"""Generate next work item ID (e.g., ET-003)."""
|
"""Generate next work item ID (e.g., ET-003 / ORCH-001).
|
||||||
|
|
||||||
|
ORCH-6: numbering is per (repo, prefix). The prefix comes from the project
|
||||||
|
registry (proj.work_item_prefix), so orchestrator issues number ORCH-001,
|
||||||
|
ORCH-002 independently of the ET sequence in enduro-trails. Default prefix
|
||||||
|
stays "ET" for backward compatibility with existing callers.
|
||||||
|
"""
|
||||||
conn = get_db()
|
conn = get_db()
|
||||||
row = conn.execute(
|
row = conn.execute(
|
||||||
"SELECT work_item_id FROM tasks WHERE repo = ? AND work_item_id IS NOT NULL ORDER BY id DESC LIMIT 1",
|
"SELECT work_item_id FROM tasks "
|
||||||
(repo,),
|
"WHERE repo = ? AND work_item_id LIKE ? AND work_item_id IS NOT NULL "
|
||||||
|
"ORDER BY id DESC LIMIT 1",
|
||||||
|
(repo, f"{prefix}-%"),
|
||||||
).fetchone()
|
).fetchone()
|
||||||
conn.close()
|
conn.close()
|
||||||
|
|
||||||
if row and row["work_item_id"]:
|
if row and row["work_item_id"]:
|
||||||
# Parse ET-003 -> 3, increment
|
# Parse <PREFIX>-003 -> 3, increment (keep the existing prefix).
|
||||||
prefix, num = row["work_item_id"].rsplit("-", 1)
|
existing_prefix, num = row["work_item_id"].rsplit("-", 1)
|
||||||
|
prefix = existing_prefix
|
||||||
next_num = int(num) + 1
|
next_num = int(num) + 1
|
||||||
else:
|
else:
|
||||||
prefix = "ET"
|
|
||||||
next_num = 1
|
next_num = 1
|
||||||
|
|
||||||
return f"{prefix}-{next_num:03d}"
|
return f"{prefix}-{next_num:03d}"
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# ORCH-1 (F-2b): job queue helpers
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
def enqueue_job(
|
||||||
|
agent: str,
|
||||||
|
repo: str,
|
||||||
|
task_content: str | None = None,
|
||||||
|
task_id: int | None = None,
|
||||||
|
max_attempts: int = 2,
|
||||||
|
) -> int:
|
||||||
|
"""Enqueue a new job (status='queued'). Returns the new job id.
|
||||||
|
|
||||||
|
This is what webhook handlers call instead of launching an agent in-process:
|
||||||
|
it is a fast DB INSERT that returns immediately. The background worker
|
||||||
|
(queue_worker) picks the job up later.
|
||||||
|
"""
|
||||||
|
conn = get_db()
|
||||||
|
cursor = conn.execute(
|
||||||
|
"INSERT INTO jobs (agent, repo, task_id, task_content, max_attempts) "
|
||||||
|
"VALUES (?, ?, ?, ?, ?)",
|
||||||
|
(agent, repo, task_id, task_content, max_attempts),
|
||||||
|
)
|
||||||
|
job_id = cursor.lastrowid
|
||||||
|
conn.commit()
|
||||||
|
conn.close()
|
||||||
|
return job_id
|
||||||
|
|
||||||
|
|
||||||
|
def claim_next_job() -> dict | None:
|
||||||
|
"""Atomically claim the oldest queued job and mark it 'running'.
|
||||||
|
|
||||||
|
Atomicity: the UPDATE carries the `status='queued'` guard in its WHERE clause
|
||||||
|
and we check `rowcount`. If two worker ticks race for the same row, only the
|
||||||
|
first UPDATE flips it to 'running' (rowcount==1); the loser sees rowcount==0
|
||||||
|
and retries the SELECT. We rely on SQLite's default per-connection transaction
|
||||||
|
so the SELECT+UPDATE pair is consistent. Returns the claimed job dict or None
|
||||||
|
when the queue is empty.
|
||||||
|
"""
|
||||||
|
conn = get_db()
|
||||||
|
try:
|
||||||
|
while True:
|
||||||
|
row = conn.execute(
|
||||||
|
"SELECT id FROM jobs WHERE status='queued' "
|
||||||
|
"AND (available_at IS NULL OR available_at <= datetime('now')) "
|
||||||
|
"ORDER BY id LIMIT 1"
|
||||||
|
).fetchone()
|
||||||
|
if not row:
|
||||||
|
return None
|
||||||
|
job_id = row["id"]
|
||||||
|
cur = conn.execute(
|
||||||
|
"UPDATE jobs SET status='running', "
|
||||||
|
"attempts = attempts + 1, started_at = datetime('now') "
|
||||||
|
"WHERE id = ? AND status='queued'",
|
||||||
|
(job_id,),
|
||||||
|
)
|
||||||
|
conn.commit()
|
||||||
|
if cur.rowcount == 1:
|
||||||
|
claimed = conn.execute(
|
||||||
|
"SELECT * FROM jobs WHERE id = ?", (job_id,)
|
||||||
|
).fetchone()
|
||||||
|
return dict(claimed)
|
||||||
|
# Lost the race for this row; loop and try the next queued job.
|
||||||
|
finally:
|
||||||
|
conn.close()
|
||||||
|
|
||||||
|
|
||||||
|
def mark_job_transient(job_id: int, available_at_sql_offset_seconds: int,
|
||||||
|
error: str | None = None) -> None:
|
||||||
|
"""ORCH-1 resilience: requeue a job after a *transient* failure (429/overload/net).
|
||||||
|
|
||||||
|
Increments `transient_attempts` (separate from the code-fault `attempts`),
|
||||||
|
sets status back to 'queued', and gates re-pickup via `available_at` =
|
||||||
|
now + backoff seconds. started_at/finished_at are cleared.
|
||||||
|
"""
|
||||||
|
conn = get_db()
|
||||||
|
sets = [
|
||||||
|
"status='queued'",
|
||||||
|
"transient_attempts = transient_attempts + 1",
|
||||||
|
"available_at = datetime('now', ?)",
|
||||||
|
"started_at = NULL",
|
||||||
|
"finished_at = NULL",
|
||||||
|
]
|
||||||
|
params: list = [f"+{int(available_at_sql_offset_seconds)} seconds"]
|
||||||
|
if error is not None:
|
||||||
|
sets.append("error = ?")
|
||||||
|
params.append(error)
|
||||||
|
params.append(job_id)
|
||||||
|
conn.execute(f"UPDATE jobs SET {', '.join(sets)} WHERE id = ?", params)
|
||||||
|
conn.commit()
|
||||||
|
conn.close()
|
||||||
|
|
||||||
|
|
||||||
|
def mark_job(
|
||||||
|
job_id: int,
|
||||||
|
status: str,
|
||||||
|
run_id: int | None = None,
|
||||||
|
error: str | None = None,
|
||||||
|
):
|
||||||
|
"""Update a job's status (queued|running|done|failed).
|
||||||
|
|
||||||
|
- run_id (optional): link to the agent_runs row that executed this job.
|
||||||
|
- error (optional): last error message (for failed/retry).
|
||||||
|
- 'done'/'failed' also stamp finished_at.
|
||||||
|
- 'queued' (requeue for retry) clears started_at/finished_at so the next
|
||||||
|
claim treats it as fresh.
|
||||||
|
"""
|
||||||
|
conn = get_db()
|
||||||
|
sets = ["status = ?"]
|
||||||
|
params: list = [status]
|
||||||
|
if run_id is not None:
|
||||||
|
sets.append("run_id = ?")
|
||||||
|
params.append(run_id)
|
||||||
|
if error is not None:
|
||||||
|
sets.append("error = ?")
|
||||||
|
params.append(error)
|
||||||
|
if status in ("done", "failed"):
|
||||||
|
sets.append("finished_at = datetime('now')")
|
||||||
|
elif status == "queued":
|
||||||
|
sets.append("started_at = NULL")
|
||||||
|
sets.append("finished_at = NULL")
|
||||||
|
params.append(job_id)
|
||||||
|
conn.execute(f"UPDATE jobs SET {', '.join(sets)} WHERE id = ?", params)
|
||||||
|
conn.commit()
|
||||||
|
conn.close()
|
||||||
|
|
||||||
|
|
||||||
|
def count_running_jobs() -> int:
|
||||||
|
"""Number of jobs currently in 'running' status (for max_concurrency)."""
|
||||||
|
conn = get_db()
|
||||||
|
n = conn.execute(
|
||||||
|
"SELECT COUNT(*) FROM jobs WHERE status='running'"
|
||||||
|
).fetchone()[0]
|
||||||
|
conn.close()
|
||||||
|
return int(n)
|
||||||
|
|
||||||
|
|
||||||
|
def requeue_running_jobs() -> int:
|
||||||
|
"""Queue-recovery: on startup, any job left 'running' belongs to a worker that
|
||||||
|
died on restart -> put it back to 'queued'. attempts are kept as-is (the next
|
||||||
|
claim does NOT re-increment beyond what is needed; claim_next_job increments on
|
||||||
|
pickup). Returns the number of requeued jobs.
|
||||||
|
"""
|
||||||
|
conn = get_db()
|
||||||
|
cur = conn.execute(
|
||||||
|
"UPDATE jobs SET status='queued', started_at = NULL "
|
||||||
|
"WHERE status='running'"
|
||||||
|
)
|
||||||
|
conn.commit()
|
||||||
|
n = cur.rowcount
|
||||||
|
conn.close()
|
||||||
|
return int(n)
|
||||||
|
|
||||||
|
|
||||||
|
def get_job(job_id: int) -> dict | None:
|
||||||
|
"""Fetch a single job by id."""
|
||||||
|
conn = get_db()
|
||||||
|
row = conn.execute("SELECT * FROM jobs WHERE id = ?", (job_id,)).fetchone()
|
||||||
|
conn.close()
|
||||||
|
return dict(row) if row else None
|
||||||
|
|
||||||
|
|
||||||
|
def job_status_counts() -> dict:
|
||||||
|
"""Return counts grouped by status (for /queue observability)."""
|
||||||
|
conn = get_db()
|
||||||
|
rows = conn.execute(
|
||||||
|
"SELECT status, COUNT(*) AS n FROM jobs GROUP BY status"
|
||||||
|
).fetchall()
|
||||||
|
conn.close()
|
||||||
|
counts = {"queued": 0, "running": 0, "done": 0, "failed": 0}
|
||||||
|
for r in rows:
|
||||||
|
counts[r["status"]] = r["n"]
|
||||||
|
return counts
|
||||||
|
|
||||||
|
|
||||||
|
def recent_jobs(limit: int = 10) -> list[dict]:
|
||||||
|
"""Return the most recent jobs (for /queue observability)."""
|
||||||
|
conn = get_db()
|
||||||
|
rows = conn.execute(
|
||||||
|
"SELECT * FROM jobs ORDER BY id DESC LIMIT ?", (limit,)
|
||||||
|
).fetchall()
|
||||||
|
conn.close()
|
||||||
|
return [dict(r) for r in rows]
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# ORCH-1b (resilience): transient backoff helpers
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
def requeue_job_transient(job_id: int, delay_seconds: float, error: str | None = None):
|
||||||
|
"""ORCH-1b: requeue a job after a TRANSIENT (429/overload/network) failure.
|
||||||
|
|
||||||
|
Unlike a code-fault requeue, this:
|
||||||
|
- increments `transient_attempts` (a separate budget from code-fault attempts)
|
||||||
|
- sets `available_at = now + delay_seconds` so claim_next_job won't pick it
|
||||||
|
up until the backoff window elapses
|
||||||
|
- sets status back to 'queued' and clears started_at/finished_at
|
||||||
|
|
||||||
|
delay_seconds is computed by the caller (exp backoff, capped, Retry-After).
|
||||||
|
"""
|
||||||
|
conn = get_db()
|
||||||
|
conn.execute(
|
||||||
|
"UPDATE jobs SET status='queued', "
|
||||||
|
"transient_attempts = transient_attempts + 1, "
|
||||||
|
"available_at = datetime('now', ? || ' seconds'), "
|
||||||
|
"started_at = NULL, finished_at = NULL, "
|
||||||
|
"error = COALESCE(?, error) "
|
||||||
|
"WHERE id = ?",
|
||||||
|
(f"+{int(round(delay_seconds))}", error, job_id),
|
||||||
|
)
|
||||||
|
conn.commit()
|
||||||
|
conn.close()
|
||||||
|
|
||||||
|
|
||||||
|
def compute_backoff(transient_attempts: int, retry_after: float | None = None) -> float:
|
||||||
|
"""ORCH-1b: exponential backoff (seconds) for a transient failure.
|
||||||
|
|
||||||
|
delay = min(2**transient_attempts * base, max). If the server sent a
|
||||||
|
Retry-After hint we honour it as a floor (use the larger of the two so we
|
||||||
|
never poll sooner than the server asked).
|
||||||
|
|
||||||
|
`transient_attempts` is the count AFTER this failure (i.e. how many transient
|
||||||
|
failures have occurred), so the first backoff uses 2**1.
|
||||||
|
"""
|
||||||
|
base = getattr(settings, "backoff_base_seconds", 10)
|
||||||
|
cap = getattr(settings, "backoff_max_seconds", 600)
|
||||||
|
exp = min((2 ** max(transient_attempts, 0)) * base, cap)
|
||||||
|
if retry_after is not None and retry_after > 0:
|
||||||
|
return float(min(max(exp, retry_after), cap))
|
||||||
|
return float(exp)
|
||||||
|
|||||||
87
src/error_classifier.py
Normal file
87
src/error_classifier.py
Normal file
@@ -0,0 +1,87 @@
|
|||||||
|
"""ORCH-1 resilience: classify an agent failure as transient vs permanent.
|
||||||
|
|
||||||
|
Rate limits / overload / network blips cannot be reliably predicted in advance,
|
||||||
|
so we classify *after the run* by scanning the agent's combined stdout/stderr log
|
||||||
|
(B-2 sends both to /app/data/runs/<run_id>.log).
|
||||||
|
|
||||||
|
- transient -> 429 / rate limit / overloaded / network / quota-exhausted etc.
|
||||||
|
=> backoff + transient retry (separate counter, larger budget).
|
||||||
|
- permanent -> a genuine code fault / agent error
|
||||||
|
=> normal attempts < max_attempts, then 'failed'.
|
||||||
|
|
||||||
|
Also extracts a Retry-After hint (seconds) when the server provided one.
|
||||||
|
"""
|
||||||
|
import re
|
||||||
|
|
||||||
|
# Case-insensitive substrings/patterns that signal a transient/rate-limit issue.
|
||||||
|
_TRANSIENT_PATTERNS = [
|
||||||
|
r"\b429\b",
|
||||||
|
r"rate[\s_-]*limit",
|
||||||
|
r"rate_limit_error",
|
||||||
|
r"overloaded",
|
||||||
|
r"overloaded_error",
|
||||||
|
r"too many requests",
|
||||||
|
r"quota",
|
||||||
|
r"insufficient[_\s-]*quota",
|
||||||
|
r"retry[\s-]*after",
|
||||||
|
r"service unavailable",
|
||||||
|
r"\b503\b",
|
||||||
|
r"\b529\b",
|
||||||
|
r"timed out",
|
||||||
|
r"timeout",
|
||||||
|
r"connection (reset|refused|error|aborted)",
|
||||||
|
r"temporarily unavailable",
|
||||||
|
r"econnreset",
|
||||||
|
r"etimedout",
|
||||||
|
]
|
||||||
|
|
||||||
|
_TRANSIENT_RE = re.compile("|".join(_TRANSIENT_PATTERNS), re.IGNORECASE)
|
||||||
|
|
||||||
|
# Retry-After: header style ("Retry-After: 30") or JSON ("retry_after": 30) or
|
||||||
|
# "retry after 30 seconds". Returns the integer seconds.
|
||||||
|
_RETRY_AFTER_RE = re.compile(
|
||||||
|
r"retry[\s_-]*after[\"']?\s*[:=]?\s*[\"']?\s*(\d+)",
|
||||||
|
re.IGNORECASE,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def classify_text(text: str) -> str:
|
||||||
|
"""Return 'transient' or 'permanent' for a chunk of log/stderr text."""
|
||||||
|
if not text:
|
||||||
|
return "permanent"
|
||||||
|
return "transient" if _TRANSIENT_RE.search(text) else "permanent"
|
||||||
|
|
||||||
|
|
||||||
|
def parse_retry_after(text: str) -> int | None:
|
||||||
|
"""Return Retry-After seconds if present in the text, else None."""
|
||||||
|
if not text:
|
||||||
|
return None
|
||||||
|
m = _RETRY_AFTER_RE.search(text)
|
||||||
|
if m:
|
||||||
|
try:
|
||||||
|
return int(m.group(1))
|
||||||
|
except (TypeError, ValueError):
|
||||||
|
return None
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
def classify_log_file(path: str, tail_bytes: int = 16384) -> tuple[str, int | None]:
|
||||||
|
"""Classify the tail of a log file.
|
||||||
|
|
||||||
|
Reads the last `tail_bytes` of the log (rate-limit messages appear near the
|
||||||
|
end) and returns (classification, retry_after_seconds_or_None).
|
||||||
|
On any read error, treats it as 'permanent' (no special backoff).
|
||||||
|
"""
|
||||||
|
if not path:
|
||||||
|
return "permanent", None
|
||||||
|
try:
|
||||||
|
with open(path, "rb") as f:
|
||||||
|
try:
|
||||||
|
f.seek(-tail_bytes, 2)
|
||||||
|
except OSError:
|
||||||
|
f.seek(0)
|
||||||
|
data = f.read()
|
||||||
|
text = data.decode("utf-8", errors="replace")
|
||||||
|
except Exception:
|
||||||
|
return "permanent", None
|
||||||
|
return classify_text(text), parse_retry_after(text)
|
||||||
34
src/main.py
34
src/main.py
@@ -51,7 +51,25 @@ async def lifespan(app: FastAPI):
|
|||||||
except Exception:
|
except Exception:
|
||||||
pass
|
pass
|
||||||
log.warning(f"Recovered {len(orphan_rows)} orphaned agent runs")
|
log.warning(f"Recovered {len(orphan_rows)} orphaned agent runs")
|
||||||
yield
|
|
||||||
|
# ORCH-1 (F-2b): queue-recovery. Any job left in 'running' status belongs to a
|
||||||
|
# worker that died on the previous restart -> put it back to 'queued' so the
|
||||||
|
# worker re-picks it up (restart-safe, no lost work). Runs AFTER M-1.
|
||||||
|
from .db import requeue_running_jobs
|
||||||
|
requeued = requeue_running_jobs()
|
||||||
|
if requeued:
|
||||||
|
log.warning(f"Queue-recovery: requeued {requeued} running job(s) after restart")
|
||||||
|
|
||||||
|
# Start the background job-queue worker (ORCH-1).
|
||||||
|
from .queue_worker import worker
|
||||||
|
worker.start()
|
||||||
|
|
||||||
|
try:
|
||||||
|
yield
|
||||||
|
finally:
|
||||||
|
# Graceful shutdown of the worker (running agents keep going; their jobs
|
||||||
|
# are requeued on next start via queue-recovery if the process dies).
|
||||||
|
worker.stop()
|
||||||
|
|
||||||
|
|
||||||
app = FastAPI(title="Multi-Agent Orchestrator", lifespan=lifespan)
|
app = FastAPI(title="Multi-Agent Orchestrator", lifespan=lifespan)
|
||||||
@@ -73,3 +91,17 @@ async def status():
|
|||||||
).fetchall()
|
).fetchall()
|
||||||
conn.close()
|
conn.close()
|
||||||
return {"active_tasks": [dict(t) for t in tasks]}
|
return {"active_tasks": [dict(t) for t in tasks]}
|
||||||
|
|
||||||
|
|
||||||
|
@app.get("/queue")
|
||||||
|
async def queue():
|
||||||
|
"""ORCH-1: job-queue observability — status counts + recent jobs."""
|
||||||
|
from .db import job_status_counts, recent_jobs
|
||||||
|
from .queue_worker import worker
|
||||||
|
return {
|
||||||
|
"counts": job_status_counts(),
|
||||||
|
"max_concurrency": worker.max_concurrency,
|
||||||
|
"poll_interval": worker.poll_interval,
|
||||||
|
"resilience": worker.status(),
|
||||||
|
"recent": recent_jobs(10),
|
||||||
|
}
|
||||||
|
|||||||
@@ -11,6 +11,35 @@ PLANE_HEADERS = {"X-API-Key": settings.plane_api_token}
|
|||||||
WORKSPACE = settings.plane_workspace_slug
|
WORKSPACE = settings.plane_workspace_slug
|
||||||
PROJECT_ID = settings.plane_project_id or "7a79f0a9-5278-49cd-9007-9a338f238f9c"
|
PROJECT_ID = settings.plane_project_id or "7a79f0a9-5278-49cd-9007-9a338f238f9c"
|
||||||
|
|
||||||
|
|
||||||
|
def _resolve_project_id(work_item_id: str = None, project_id: str = None) -> str:
|
||||||
|
"""ORCH-6: resolve the Plane project id for a sync call.
|
||||||
|
|
||||||
|
Priority:
|
||||||
|
1. explicit project_id arg (caller already knows the project),
|
||||||
|
2. project derived from the task's repo in the DB (by work_item_id),
|
||||||
|
3. legacy default PROJECT_ID (enduro) for backward compatibility.
|
||||||
|
"""
|
||||||
|
if project_id:
|
||||||
|
return project_id
|
||||||
|
if work_item_id:
|
||||||
|
try:
|
||||||
|
from .db import get_db
|
||||||
|
from .projects import get_project_by_repo
|
||||||
|
conn = get_db()
|
||||||
|
row = conn.execute(
|
||||||
|
"SELECT repo FROM tasks WHERE work_item_id = ? ORDER BY id DESC LIMIT 1",
|
||||||
|
(work_item_id,),
|
||||||
|
).fetchone()
|
||||||
|
conn.close()
|
||||||
|
if row and row[0]:
|
||||||
|
proj = get_project_by_repo(row[0])
|
||||||
|
if proj:
|
||||||
|
return proj.plane_project_id
|
||||||
|
except Exception as e:
|
||||||
|
logger.debug(f"_resolve_project_id fallback for {work_item_id}: {e}")
|
||||||
|
return PROJECT_ID
|
||||||
|
|
||||||
# Plane state IDs
|
# Plane state IDs
|
||||||
PLANE_STATES = {
|
PLANE_STATES = {
|
||||||
"backlog": "113b24f6-cce8-4be9-9a22-a359b9cf0122",
|
"backlog": "113b24f6-cce8-4be9-9a22-a359b9cf0122",
|
||||||
@@ -36,8 +65,9 @@ STAGE_TO_STATE = {
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
def find_issue_id(work_item_id: str) -> str | None:
|
def find_issue_id(work_item_id: str, project_id: str = None) -> str | None:
|
||||||
"""Find Plane issue UUID by work_item_id (e.g. 'ET-002')."""
|
"""Find Plane issue UUID by work_item_id (e.g. 'ET-002')."""
|
||||||
|
project_id = _resolve_project_id(work_item_id, project_id)
|
||||||
# Primary: lookup from DB (plane_issue_id column)
|
# Primary: lookup from DB (plane_issue_id column)
|
||||||
try:
|
try:
|
||||||
from .db import get_db
|
from .db import get_db
|
||||||
@@ -52,7 +82,7 @@ def find_issue_id(work_item_id: str) -> str | None:
|
|||||||
logger.debug(f"DB lookup failed for {work_item_id}: {e}")
|
logger.debug(f"DB lookup failed for {work_item_id}: {e}")
|
||||||
|
|
||||||
# Fallback: search via Plane API
|
# Fallback: search via Plane API
|
||||||
url = f"{PLANE_BASE}/workspaces/{WORKSPACE}/projects/{PROJECT_ID}/issues/"
|
url = f"{PLANE_BASE}/workspaces/{WORKSPACE}/projects/{project_id}/issues/"
|
||||||
try:
|
try:
|
||||||
# First try search by work_item_id
|
# First try search by work_item_id
|
||||||
resp = httpx.get(url, headers=PLANE_HEADERS, params={"search": work_item_id}, timeout=10)
|
resp = httpx.get(url, headers=PLANE_HEADERS, params={"search": work_item_id}, timeout=10)
|
||||||
@@ -83,18 +113,19 @@ def find_issue_id(work_item_id: str) -> str | None:
|
|||||||
return None
|
return None
|
||||||
|
|
||||||
|
|
||||||
def update_issue_state(work_item_id: str, stage: str):
|
def update_issue_state(work_item_id: str, stage: str, project_id: str = None):
|
||||||
"""Update Plane issue state based on orchestrator stage."""
|
"""Update Plane issue state based on orchestrator stage."""
|
||||||
state_id = STAGE_TO_STATE.get(stage)
|
state_id = STAGE_TO_STATE.get(stage)
|
||||||
if not state_id:
|
if not state_id:
|
||||||
return
|
return
|
||||||
|
|
||||||
issue_id = find_issue_id(work_item_id)
|
project_id = _resolve_project_id(work_item_id, project_id)
|
||||||
|
issue_id = find_issue_id(work_item_id, project_id)
|
||||||
if not issue_id:
|
if not issue_id:
|
||||||
logger.warning(f"Issue not found in Plane for {work_item_id}")
|
logger.warning(f"Issue not found in Plane for {work_item_id}")
|
||||||
return
|
return
|
||||||
|
|
||||||
url = f"{PLANE_BASE}/workspaces/{WORKSPACE}/projects/{PROJECT_ID}/issues/{issue_id}/"
|
url = f"{PLANE_BASE}/workspaces/{WORKSPACE}/projects/{project_id}/issues/{issue_id}/"
|
||||||
try:
|
try:
|
||||||
resp = httpx.patch(url, headers=PLANE_HEADERS, json={"state": state_id}, timeout=10)
|
resp = httpx.patch(url, headers=PLANE_HEADERS, json={"state": state_id}, timeout=10)
|
||||||
resp.raise_for_status()
|
resp.raise_for_status()
|
||||||
@@ -103,14 +134,15 @@ def update_issue_state(work_item_id: str, stage: str):
|
|||||||
logger.error(f"Failed to update Plane state for {work_item_id}: {e}")
|
logger.error(f"Failed to update Plane state for {work_item_id}: {e}")
|
||||||
|
|
||||||
|
|
||||||
def add_comment(work_item_id: str, text: str):
|
def add_comment(work_item_id: str, text: str, project_id: str = None):
|
||||||
"""Add a comment to Plane issue."""
|
"""Add a comment to Plane issue."""
|
||||||
issue_id = find_issue_id(work_item_id)
|
project_id = _resolve_project_id(work_item_id, project_id)
|
||||||
|
issue_id = find_issue_id(work_item_id, project_id)
|
||||||
if not issue_id:
|
if not issue_id:
|
||||||
logger.warning(f"Issue not found in Plane for {work_item_id}, skipping comment")
|
logger.warning(f"Issue not found in Plane for {work_item_id}, skipping comment")
|
||||||
return
|
return
|
||||||
|
|
||||||
url = f"{PLANE_BASE}/workspaces/{WORKSPACE}/projects/{PROJECT_ID}/issues/{issue_id}/comments/"
|
url = f"{PLANE_BASE}/workspaces/{WORKSPACE}/projects/{project_id}/issues/{issue_id}/comments/"
|
||||||
html = f"<p>{text}</p>"
|
html = f"<p>{text}</p>"
|
||||||
try:
|
try:
|
||||||
resp = httpx.post(url, headers=PLANE_HEADERS, json={"comment_html": html}, timeout=10)
|
resp = httpx.post(url, headers=PLANE_HEADERS, json={"comment_html": html}, timeout=10)
|
||||||
@@ -121,33 +153,34 @@ def add_comment(work_item_id: str, text: str):
|
|||||||
|
|
||||||
|
|
||||||
|
|
||||||
def set_issue_needs_input(work_item_id: str):
|
def set_issue_needs_input(work_item_id: str, project_id: str = None):
|
||||||
"""Set issue to 'Needs Input' state — waiting for stakeholder response."""
|
"""Set issue to 'Needs Input' state — waiting for stakeholder response."""
|
||||||
_set_issue_state_direct(work_item_id, PLANE_STATES["needs_input"])
|
_set_issue_state_direct(work_item_id, PLANE_STATES["needs_input"], project_id)
|
||||||
|
|
||||||
|
|
||||||
def set_issue_in_review(work_item_id: str):
|
def set_issue_in_review(work_item_id: str, project_id: str = None):
|
||||||
"""Set issue to 'In Review' state — waiting for :approved: or :rejected:."""
|
"""Set issue to 'In Review' state — waiting for :approved: or :rejected:."""
|
||||||
_set_issue_state_direct(work_item_id, PLANE_STATES["in_review"])
|
_set_issue_state_direct(work_item_id, PLANE_STATES["in_review"], project_id)
|
||||||
|
|
||||||
|
|
||||||
def set_issue_blocked(work_item_id: str):
|
def set_issue_blocked(work_item_id: str, project_id: str = None):
|
||||||
"""Set issue to 'Blocked' state — manual intervention needed."""
|
"""Set issue to 'Blocked' state — manual intervention needed."""
|
||||||
_set_issue_state_direct(work_item_id, PLANE_STATES["blocked"])
|
_set_issue_state_direct(work_item_id, PLANE_STATES["blocked"], project_id)
|
||||||
|
|
||||||
|
|
||||||
def set_issue_in_progress(work_item_id: str):
|
def set_issue_in_progress(work_item_id: str, project_id: str = None):
|
||||||
"""Set issue to 'In Progress' state — agent working."""
|
"""Set issue to 'In Progress' state — agent working."""
|
||||||
_set_issue_state_direct(work_item_id, PLANE_STATES["in_progress"])
|
_set_issue_state_direct(work_item_id, PLANE_STATES["in_progress"], project_id)
|
||||||
|
|
||||||
|
|
||||||
def _set_issue_state_direct(work_item_id: str, state_id: str):
|
def _set_issue_state_direct(work_item_id: str, state_id: str, project_id: str = None):
|
||||||
"""Set issue state directly by state_id."""
|
"""Set issue state directly by state_id."""
|
||||||
issue_id = find_issue_id(work_item_id)
|
project_id = _resolve_project_id(work_item_id, project_id)
|
||||||
|
issue_id = find_issue_id(work_item_id, project_id)
|
||||||
if not issue_id:
|
if not issue_id:
|
||||||
logger.warning(f"Issue not found in Plane for {work_item_id}")
|
logger.warning(f"Issue not found in Plane for {work_item_id}")
|
||||||
return
|
return
|
||||||
url = f"{PLANE_BASE}/workspaces/{WORKSPACE}/projects/{PROJECT_ID}/issues/{issue_id}/"
|
url = f"{PLANE_BASE}/workspaces/{WORKSPACE}/projects/{project_id}/issues/{issue_id}/"
|
||||||
try:
|
try:
|
||||||
resp = httpx.patch(url, headers=PLANE_HEADERS, json={"state": state_id}, timeout=10)
|
resp = httpx.patch(url, headers=PLANE_HEADERS, json={"state": state_id}, timeout=10)
|
||||||
resp.raise_for_status()
|
resp.raise_for_status()
|
||||||
@@ -156,9 +189,10 @@ def _set_issue_state_direct(work_item_id: str, state_id: str):
|
|||||||
logger.error(f"Failed to update Plane state for {work_item_id}: {e}")
|
logger.error(f"Failed to update Plane state for {work_item_id}: {e}")
|
||||||
|
|
||||||
|
|
||||||
def notify_stage_change(work_item_id: str, old_stage: str, new_stage: str, agent: str = None):
|
def notify_stage_change(work_item_id: str, old_stage: str, new_stage: str, agent: str = None, project_id: str = None):
|
||||||
"""Notify Plane about stage transition with links."""
|
"""Notify Plane about stage transition with links."""
|
||||||
update_issue_state(work_item_id, new_stage)
|
project_id = _resolve_project_id(work_item_id, project_id)
|
||||||
|
update_issue_state(work_item_id, new_stage, project_id)
|
||||||
|
|
||||||
msg = f"🔄 Stage: {old_stage} → {new_stage}"
|
msg = f"🔄 Stage: {old_stage} → {new_stage}"
|
||||||
if agent:
|
if agent:
|
||||||
@@ -193,15 +227,16 @@ def notify_stage_change(work_item_id: str, old_stage: str, new_stage: str, agent
|
|||||||
except Exception:
|
except Exception:
|
||||||
pass
|
pass
|
||||||
|
|
||||||
add_comment(work_item_id, msg)
|
add_comment(work_item_id, msg, project_id)
|
||||||
|
|
||||||
|
|
||||||
def notify_qg_failure(work_item_id: str, stage: str, check: str, reason: str):
|
def notify_qg_failure(work_item_id: str, stage: str, check: str, reason: str, project_id: str = None):
|
||||||
"""Notify Plane about QG failure."""
|
"""Notify Plane about QG failure."""
|
||||||
add_comment(work_item_id, f"⚠️ QG failed at {stage}: {check} — {reason}")
|
add_comment(work_item_id, f"⚠️ QG failed at {stage}: {check} — {reason}", project_id)
|
||||||
|
|
||||||
|
|
||||||
def notify_done(work_item_id: str):
|
def notify_done(work_item_id: str, project_id: str = None):
|
||||||
"""Mark issue as Done in Plane."""
|
"""Mark issue as Done in Plane."""
|
||||||
update_issue_state(work_item_id, "done")
|
project_id = _resolve_project_id(work_item_id, project_id)
|
||||||
add_comment(work_item_id, "✅ Task completed! PR merged and deployed.")
|
update_issue_state(work_item_id, "done", project_id)
|
||||||
|
add_comment(work_item_id, "✅ Task completed! PR merged and deployed.", project_id)
|
||||||
|
|||||||
106
src/preflight.py
Normal file
106
src/preflight.py
Normal file
@@ -0,0 +1,106 @@
|
|||||||
|
"""ORCH-1 resilience: cheap preflight check (CLI / network available?).
|
||||||
|
|
||||||
|
Goal: before the worker claims a job, confirm the claude CLI binary and runtime
|
||||||
|
are reachable WITHOUT spending any tokens. We only do local/cheap checks:
|
||||||
|
|
||||||
|
1. os.path.exists(CLAUDE_BIN) -- instant
|
||||||
|
2. `claude --version` (timeout 5s) -- spawns CLI, does NOT call the API
|
||||||
|
|
||||||
|
The result is cached for `preflight_cache_ttl` seconds so we do not re-run
|
||||||
|
`claude --version` on every worker tick.
|
||||||
|
|
||||||
|
🚫 We deliberately do NOT do a prompt ping (ping->pong) — that would burn the
|
||||||
|
rate limit and add latency. Preflight is local-only.
|
||||||
|
"""
|
||||||
|
import os
|
||||||
|
import time
|
||||||
|
import logging
|
||||||
|
import subprocess
|
||||||
|
|
||||||
|
from .config import settings
|
||||||
|
|
||||||
|
logger = logging.getLogger("orchestrator.preflight")
|
||||||
|
|
||||||
|
_VERSION_TIMEOUT = 5
|
||||||
|
|
||||||
|
|
||||||
|
class _PreflightCache:
|
||||||
|
def __init__(self):
|
||||||
|
self.ts: float = 0.0
|
||||||
|
self.ok: bool = False
|
||||||
|
self.reason: str = "not checked yet"
|
||||||
|
|
||||||
|
|
||||||
|
_cache = _PreflightCache()
|
||||||
|
|
||||||
|
|
||||||
|
def _claude_bin() -> str:
|
||||||
|
"""Resolve the claude binary preflight should check.
|
||||||
|
|
||||||
|
Must match the binary the launcher actually spawns. The launcher hardcodes
|
||||||
|
AgentLauncher.CLAUDE_BIN for the real Popen, so we prefer that; we only fall
|
||||||
|
back to settings.claude_bin / a default if it is somehow unset. (Note: the
|
||||||
|
container's ORCH_CLAUDE_BIN may point elsewhere; preflight follows the path
|
||||||
|
that is genuinely executed, not the unused env override.)
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
from .agents.launcher import AgentLauncher
|
||||||
|
launcher_bin = getattr(AgentLauncher, "CLAUDE_BIN", None)
|
||||||
|
if launcher_bin and os.path.exists(launcher_bin):
|
||||||
|
return launcher_bin
|
||||||
|
# Launcher path not present -> fall back to configured/default.
|
||||||
|
return launcher_bin or getattr(settings, "claude_bin", None) or "/opt/claude-code/bin/claude.exe"
|
||||||
|
except Exception:
|
||||||
|
return getattr(settings, "claude_bin", None) or "/opt/claude-code/bin/claude.exe"
|
||||||
|
|
||||||
|
|
||||||
|
def _run_version(bin_path: str) -> tuple[bool, str]:
|
||||||
|
"""`claude --version` — proves the CLI runs without touching the API."""
|
||||||
|
try:
|
||||||
|
r = subprocess.run(
|
||||||
|
[bin_path, "--version"],
|
||||||
|
capture_output=True,
|
||||||
|
text=True,
|
||||||
|
timeout=_VERSION_TIMEOUT,
|
||||||
|
)
|
||||||
|
if r.returncode == 0:
|
||||||
|
return True, (r.stdout or r.stderr or "").strip()[:120] or "ok"
|
||||||
|
return False, f"--version exit {r.returncode}: {(r.stderr or r.stdout).strip()[:120]}"
|
||||||
|
except subprocess.TimeoutExpired:
|
||||||
|
return False, f"--version timed out after {_VERSION_TIMEOUT}s"
|
||||||
|
except FileNotFoundError:
|
||||||
|
return False, "claude binary not found (FileNotFoundError)"
|
||||||
|
except Exception as e: # pragma: no cover - defensive
|
||||||
|
return False, f"--version error: {e}"
|
||||||
|
|
||||||
|
|
||||||
|
def _compute() -> tuple[bool, str]:
|
||||||
|
bin_path = _claude_bin()
|
||||||
|
if not os.path.exists(bin_path):
|
||||||
|
return False, f"CLAUDE_BIN not found: {bin_path}"
|
||||||
|
return _run_version(bin_path)
|
||||||
|
|
||||||
|
|
||||||
|
def check(force: bool = False) -> tuple[bool, str]:
|
||||||
|
"""Return (ok, reason). Cached for preflight_cache_ttl seconds.
|
||||||
|
|
||||||
|
force=True bypasses the cache (used by the breaker half-open probe / tests).
|
||||||
|
"""
|
||||||
|
now = time.time()
|
||||||
|
ttl = settings.preflight_cache_ttl
|
||||||
|
if not force and _cache.ts > 0 and (now - _cache.ts) < ttl:
|
||||||
|
return _cache.ok, _cache.reason
|
||||||
|
ok, reason = _compute()
|
||||||
|
_cache.ts = now
|
||||||
|
_cache.ok = ok
|
||||||
|
_cache.reason = reason
|
||||||
|
if not ok:
|
||||||
|
logger.warning(f"Preflight FAIL: {reason}")
|
||||||
|
return ok, reason
|
||||||
|
|
||||||
|
|
||||||
|
def reset_cache() -> None:
|
||||||
|
"""Invalidate the cache (tests / forced recheck)."""
|
||||||
|
_cache.ts = 0.0
|
||||||
|
_cache.ok = False
|
||||||
|
_cache.reason = "reset"
|
||||||
127
src/projects.py
Normal file
127
src/projects.py
Normal file
@@ -0,0 +1,127 @@
|
|||||||
|
"""ORCH-6: Project registry — map Plane project id -> repo / work-item prefix.
|
||||||
|
|
||||||
|
Root cause of the 2026-06-02 incident: the Plane webhook listened to the whole
|
||||||
|
workspace and hardcoded ``repo = settings.default_repo`` (enduro-trails). Every
|
||||||
|
issue from any project was funneled into one repo with one prefix (ET).
|
||||||
|
|
||||||
|
This module introduces a small registry keyed by the Plane project uuid so the
|
||||||
|
orchestrator can:
|
||||||
|
* filter webhooks by project (ignore unknown projects),
|
||||||
|
* resolve the gitea repo + work-item prefix for a known project,
|
||||||
|
* route Plane sync (state/comment) into the issue's own project.
|
||||||
|
|
||||||
|
Source of truth: ``settings.projects_json`` (a JSON array set via the
|
||||||
|
``ORCH_PROJECTS_JSON`` env var). If unset/empty/invalid, a built-in default
|
||||||
|
registry is used so the system works out of the box.
|
||||||
|
"""
|
||||||
|
|
||||||
|
import json
|
||||||
|
import logging
|
||||||
|
from dataclasses import dataclass
|
||||||
|
|
||||||
|
from .config import settings
|
||||||
|
|
||||||
|
logger = logging.getLogger("orchestrator.projects")
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass(frozen=True)
|
||||||
|
class ProjectConfig:
|
||||||
|
plane_project_id: str # uuid of the Plane project (registry key)
|
||||||
|
repo: str # gitea repo name (== folder under /repos)
|
||||||
|
work_item_prefix: str # ET / ORCH
|
||||||
|
name: str # human-readable label
|
||||||
|
|
||||||
|
|
||||||
|
# Built-in default registry (used when ORCH_PROJECTS_JSON is empty/invalid).
|
||||||
|
# Keep enduro-trails first so existing behaviour is the safe default.
|
||||||
|
_DEFAULT_PROJECTS = [
|
||||||
|
ProjectConfig(
|
||||||
|
plane_project_id="7a79f0a9-5278-49cd-9007-9a338f238f9c",
|
||||||
|
repo="enduro-trails",
|
||||||
|
work_item_prefix="ET",
|
||||||
|
name="enduro-trails",
|
||||||
|
),
|
||||||
|
ProjectConfig(
|
||||||
|
plane_project_id="8da6aa25-a60e-44d6-a1e2-d8ae59aa7d6a",
|
||||||
|
repo="orchestrator",
|
||||||
|
work_item_prefix="ORCH",
|
||||||
|
name="orchestrator",
|
||||||
|
),
|
||||||
|
]
|
||||||
|
|
||||||
|
|
||||||
|
def _parse_projects_json(raw: str) -> list[ProjectConfig] | None:
|
||||||
|
"""Parse ORCH_PROJECTS_JSON. Returns None if empty/invalid (-> use default)."""
|
||||||
|
if not raw or not raw.strip():
|
||||||
|
return None
|
||||||
|
try:
|
||||||
|
data = json.loads(raw)
|
||||||
|
except (ValueError, TypeError) as e:
|
||||||
|
logger.error(f"ORCH_PROJECTS_JSON is not valid JSON, falling back to default: {e}")
|
||||||
|
return None
|
||||||
|
if not isinstance(data, list):
|
||||||
|
logger.error("ORCH_PROJECTS_JSON must be a JSON array, falling back to default")
|
||||||
|
return None
|
||||||
|
|
||||||
|
parsed: list[ProjectConfig] = []
|
||||||
|
for i, item in enumerate(data):
|
||||||
|
if not isinstance(item, dict):
|
||||||
|
logger.error(f"ORCH_PROJECTS_JSON[{i}] is not an object, skipping")
|
||||||
|
continue
|
||||||
|
try:
|
||||||
|
parsed.append(
|
||||||
|
ProjectConfig(
|
||||||
|
plane_project_id=str(item["plane_project_id"]),
|
||||||
|
repo=str(item["repo"]),
|
||||||
|
work_item_prefix=str(item["work_item_prefix"]),
|
||||||
|
name=str(item.get("name", item["repo"])),
|
||||||
|
)
|
||||||
|
)
|
||||||
|
except KeyError as e:
|
||||||
|
logger.error(f"ORCH_PROJECTS_JSON[{i}] missing required key {e}, skipping")
|
||||||
|
continue
|
||||||
|
if not parsed:
|
||||||
|
logger.error("ORCH_PROJECTS_JSON produced no valid entries, falling back to default")
|
||||||
|
return None
|
||||||
|
return parsed
|
||||||
|
|
||||||
|
|
||||||
|
def _load_projects() -> list[ProjectConfig]:
|
||||||
|
parsed = _parse_projects_json(getattr(settings, "projects_json", "") or "")
|
||||||
|
if parsed is not None:
|
||||||
|
logger.info(f"Project registry loaded from ORCH_PROJECTS_JSON: {len(parsed)} project(s)")
|
||||||
|
return parsed
|
||||||
|
return list(_DEFAULT_PROJECTS)
|
||||||
|
|
||||||
|
|
||||||
|
# Module-level registry, built once at import.
|
||||||
|
PROJECTS: list[ProjectConfig] = _load_projects()
|
||||||
|
_BY_PLANE_ID: dict[str, ProjectConfig] = {p.plane_project_id: p for p in PROJECTS}
|
||||||
|
_BY_REPO: dict[str, ProjectConfig] = {p.repo: p for p in PROJECTS}
|
||||||
|
|
||||||
|
|
||||||
|
def get_project_by_plane_id(plane_project_id: str) -> ProjectConfig | None:
|
||||||
|
"""Resolve project config by Plane project uuid. None if unknown."""
|
||||||
|
if not plane_project_id:
|
||||||
|
return None
|
||||||
|
return _BY_PLANE_ID.get(plane_project_id)
|
||||||
|
|
||||||
|
|
||||||
|
def get_project_by_repo(repo: str) -> ProjectConfig | None:
|
||||||
|
"""Resolve project config by gitea repo name. None if unknown."""
|
||||||
|
if not repo:
|
||||||
|
return None
|
||||||
|
return _BY_REPO.get(repo)
|
||||||
|
|
||||||
|
|
||||||
|
def known_plane_project_ids() -> set[str]:
|
||||||
|
"""Set of Plane project ids the orchestrator is configured to handle."""
|
||||||
|
return set(_BY_PLANE_ID.keys())
|
||||||
|
|
||||||
|
|
||||||
|
def reload_projects() -> None:
|
||||||
|
"""Rebuild the registry from current settings (used by tests)."""
|
||||||
|
global PROJECTS, _BY_PLANE_ID, _BY_REPO
|
||||||
|
PROJECTS = _load_projects()
|
||||||
|
_BY_PLANE_ID = {p.plane_project_id: p for p in PROJECTS}
|
||||||
|
_BY_REPO = {p.repo: p for p in PROJECTS}
|
||||||
@@ -175,11 +175,15 @@ def check_analysis_approved(repo: str, work_item_id: str, branch: str | None = N
|
|||||||
# Check for :approved: comment via Plane API
|
# Check for :approved: comment via Plane API
|
||||||
try:
|
try:
|
||||||
from ..plane_sync import find_issue_id, PLANE_BASE, PLANE_HEADERS, WORKSPACE, PROJECT_ID
|
from ..plane_sync import find_issue_id, PLANE_BASE, PLANE_HEADERS, WORKSPACE, PROJECT_ID
|
||||||
issue_id = find_issue_id(work_item_id)
|
from ..projects import get_project_by_repo
|
||||||
|
# ORCH-6: verify approval in the issue's own Plane project.
|
||||||
|
_proj = get_project_by_repo(repo)
|
||||||
|
_pid = _proj.plane_project_id if _proj else PROJECT_ID
|
||||||
|
issue_id = find_issue_id(work_item_id, _pid)
|
||||||
if not issue_id:
|
if not issue_id:
|
||||||
return False, "Cannot find Plane issue to verify approval"
|
return False, "Cannot find Plane issue to verify approval"
|
||||||
|
|
||||||
url = f"{PLANE_BASE}/workspaces/{WORKSPACE}/projects/{PROJECT_ID}/issues/{issue_id}/comments/"
|
url = f"{PLANE_BASE}/workspaces/{WORKSPACE}/projects/{_pid}/issues/{issue_id}/comments/"
|
||||||
resp = httpx.get(url, headers=PLANE_HEADERS, timeout=10)
|
resp = httpx.get(url, headers=PLANE_HEADERS, timeout=10)
|
||||||
resp.raise_for_status()
|
resp.raise_for_status()
|
||||||
comments = resp.json()
|
comments = resp.json()
|
||||||
|
|||||||
246
src/queue_worker.py
Normal file
246
src/queue_worker.py
Normal file
@@ -0,0 +1,246 @@
|
|||||||
|
"""ORCH-1 (F-2b): background job-queue worker with resilience layer.
|
||||||
|
|
||||||
|
A single background thread polls the `jobs` table and spawns agents:
|
||||||
|
|
||||||
|
while running:
|
||||||
|
if breaker.open and not cooled_down: sleep; continue # don't touch CLI
|
||||||
|
if not preflight.ok: sleep; continue # CLI/net down -> wait
|
||||||
|
while count_running_jobs() < max_concurrency:
|
||||||
|
job = claim_next_job() # atomic queued -> running (available_at-gated)
|
||||||
|
if not job: break
|
||||||
|
launcher.launch_job(job) # spawns claude (Popen) + monitor thread
|
||||||
|
sleep(poll_interval)
|
||||||
|
|
||||||
|
Resilience (ДОПОЛНЕНИЕ):
|
||||||
|
A. Preflight — cheap local CLI/net check (cached, no tokens) gates claiming.
|
||||||
|
B/C. The launcher classifies failures (transient vs permanent) and applies
|
||||||
|
backoff via available_at; the worker only needs to honour available_at
|
||||||
|
(claim_next_job does) and react to transient outcomes via the breaker.
|
||||||
|
D. Circuit breaker — N consecutive transient failures -> open (pause M minutes,
|
||||||
|
no CLI calls, Telegram alert) -> half-open (probe one job) -> closed.
|
||||||
|
|
||||||
|
Design: plain daemon thread + threading.Event (the launcher already manages its
|
||||||
|
own monitor/watchdog threads + blocking Popen).
|
||||||
|
"""
|
||||||
|
import time
|
||||||
|
import logging
|
||||||
|
import threading
|
||||||
|
|
||||||
|
from .config import settings
|
||||||
|
from .db import claim_next_job, count_running_jobs
|
||||||
|
from .agents.launcher import launcher
|
||||||
|
from . import preflight
|
||||||
|
|
||||||
|
logger = logging.getLogger("orchestrator.queue_worker")
|
||||||
|
|
||||||
|
|
||||||
|
class CircuitBreaker:
|
||||||
|
"""Trips after `threshold` consecutive transient failures.
|
||||||
|
|
||||||
|
States: closed -> (threshold transient) -> open -> (after pause) half-open
|
||||||
|
-> (recovered) closed | (transient again) open.
|
||||||
|
Thread-safe enough for our single-worker + monitor-thread callbacks (a lock
|
||||||
|
guards the counters).
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self, threshold: int = None, pause_seconds: int = None):
|
||||||
|
self.threshold = threshold if threshold is not None else settings.breaker_threshold
|
||||||
|
self.pause_seconds = (
|
||||||
|
pause_seconds if pause_seconds is not None else settings.breaker_pause_seconds
|
||||||
|
)
|
||||||
|
self._lock = threading.Lock()
|
||||||
|
self.state = "closed" # closed | open | half-open
|
||||||
|
self.consecutive_transient = 0
|
||||||
|
self.opened_at = 0.0
|
||||||
|
self._notify = None # optional callable(message) for alerts
|
||||||
|
|
||||||
|
def set_notifier(self, fn):
|
||||||
|
self._notify = fn
|
||||||
|
|
||||||
|
def record_transient(self):
|
||||||
|
with self._lock:
|
||||||
|
self.consecutive_transient += 1
|
||||||
|
if self.state == "half-open":
|
||||||
|
# Probe failed -> re-open.
|
||||||
|
self._open("circuit re-opened: probe job hit transient again")
|
||||||
|
elif self.consecutive_transient >= self.threshold and self.state == "closed":
|
||||||
|
self._open(
|
||||||
|
f"circuit OPEN: {self.consecutive_transient} consecutive "
|
||||||
|
f"transient failures; pausing {self.pause_seconds}s (no CLI calls)"
|
||||||
|
)
|
||||||
|
|
||||||
|
def record_recovered(self):
|
||||||
|
with self._lock:
|
||||||
|
self.consecutive_transient = 0
|
||||||
|
if self.state in ("half-open", "open"):
|
||||||
|
self.state = "closed"
|
||||||
|
logger.info("Circuit CLOSED: recovered")
|
||||||
|
|
||||||
|
def record_permanent(self):
|
||||||
|
# A clean permanent (code-fault) failure breaks the transient streak.
|
||||||
|
with self._lock:
|
||||||
|
self.consecutive_transient = 0
|
||||||
|
|
||||||
|
def _open(self, msg: str):
|
||||||
|
self.state = "open"
|
||||||
|
self.opened_at = time.time()
|
||||||
|
logger.warning(msg)
|
||||||
|
if self._notify:
|
||||||
|
try:
|
||||||
|
self._notify(f"\U0001f534 {msg}")
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
|
||||||
|
def allow_claim(self) -> bool:
|
||||||
|
"""Return True if the worker may attempt to claim/launch a job now.
|
||||||
|
|
||||||
|
- closed -> yes.
|
||||||
|
- open -> no until pause elapsed; then transition to half-open (yes, one probe).
|
||||||
|
- half-open -> yes (the single probe).
|
||||||
|
"""
|
||||||
|
with self._lock:
|
||||||
|
if self.state == "closed":
|
||||||
|
return True
|
||||||
|
if self.state == "open":
|
||||||
|
if (time.time() - self.opened_at) >= self.pause_seconds:
|
||||||
|
self.state = "half-open"
|
||||||
|
logger.info("Circuit HALF-OPEN: probing one job")
|
||||||
|
return True
|
||||||
|
return False
|
||||||
|
# half-open: allow the probe.
|
||||||
|
return True
|
||||||
|
|
||||||
|
def snapshot(self) -> dict:
|
||||||
|
with self._lock:
|
||||||
|
remaining = 0
|
||||||
|
if self.state == "open":
|
||||||
|
remaining = max(0, int(self.pause_seconds - (time.time() - self.opened_at)))
|
||||||
|
return {
|
||||||
|
"state": self.state,
|
||||||
|
"consecutive_transient": self.consecutive_transient,
|
||||||
|
"pause_remaining_s": remaining,
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
class QueueWorker:
|
||||||
|
"""Background worker that drains the persistent job queue (with resilience)."""
|
||||||
|
|
||||||
|
def __init__(self, max_concurrency: int = None, poll_interval: float = None,
|
||||||
|
breaker: CircuitBreaker = None):
|
||||||
|
self.max_concurrency = (
|
||||||
|
max_concurrency if max_concurrency is not None else settings.max_concurrency
|
||||||
|
)
|
||||||
|
self.poll_interval = (
|
||||||
|
poll_interval if poll_interval is not None else settings.queue_poll_interval
|
||||||
|
)
|
||||||
|
self.breaker = breaker or CircuitBreaker()
|
||||||
|
self.last_preflight_ok = True
|
||||||
|
self.last_preflight_reason = "not checked"
|
||||||
|
self._stop = threading.Event()
|
||||||
|
self._thread: threading.Thread | None = None
|
||||||
|
|
||||||
|
# --- circuit breaker outcome callback wired into the launcher ----------
|
||||||
|
def _on_outcome(self, transient: bool, recovered: bool):
|
||||||
|
if recovered:
|
||||||
|
self.breaker.record_recovered()
|
||||||
|
elif transient:
|
||||||
|
self.breaker.record_transient()
|
||||||
|
else:
|
||||||
|
self.breaker.record_permanent()
|
||||||
|
|
||||||
|
def _drain_once(self):
|
||||||
|
"""Claim and launch jobs until concurrency is full or the queue is empty.
|
||||||
|
|
||||||
|
Gated by the circuit breaker and preflight: if the breaker is open (and
|
||||||
|
not yet cooled down) or preflight fails, we do NOT claim — jobs stay
|
||||||
|
queued and no CLI/tokens are touched.
|
||||||
|
"""
|
||||||
|
if not self.breaker.allow_claim():
|
||||||
|
return
|
||||||
|
ok, reason = preflight.check()
|
||||||
|
self.last_preflight_ok = ok
|
||||||
|
self.last_preflight_reason = reason
|
||||||
|
if not ok:
|
||||||
|
logger.info(f"Preflight not ok ({reason}) -> not claiming jobs this tick")
|
||||||
|
return
|
||||||
|
|
||||||
|
# In half-open we only probe a single job, regardless of max_concurrency.
|
||||||
|
half_open = self.breaker.snapshot()["state"] == "half-open"
|
||||||
|
launched = 0
|
||||||
|
while not self._stop.is_set():
|
||||||
|
if half_open and launched >= 1:
|
||||||
|
return
|
||||||
|
if count_running_jobs() >= self.max_concurrency:
|
||||||
|
return
|
||||||
|
job = claim_next_job()
|
||||||
|
if not job:
|
||||||
|
return
|
||||||
|
launched += 1
|
||||||
|
try:
|
||||||
|
run_id = launcher.launch_job(job)
|
||||||
|
logger.info(
|
||||||
|
f"Worker launched job {job['id']} ({job['agent']}, "
|
||||||
|
f"repo {job['repo']}) -> run_id={run_id}"
|
||||||
|
)
|
||||||
|
except Exception as e:
|
||||||
|
# Launch itself failed (e.g. repo missing): treat as a permanent
|
||||||
|
# launch error so the job does not wedge as 'running' forever.
|
||||||
|
logger.error(f"Worker failed to launch job {job['id']}: {e}")
|
||||||
|
try:
|
||||||
|
from .db import get_job, mark_job
|
||||||
|
|
||||||
|
j = get_job(job["id"])
|
||||||
|
attempts = j.get("attempts", 0) if j else 0
|
||||||
|
max_attempts = j.get("max_attempts", 2) if j else 2
|
||||||
|
if attempts < max_attempts:
|
||||||
|
mark_job(job["id"], "queued", error=f"launch error: {e}")
|
||||||
|
else:
|
||||||
|
mark_job(job["id"], "failed", error=f"launch error: {e}")
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
|
||||||
|
def _run(self):
|
||||||
|
logger.info(
|
||||||
|
f"Queue worker started (max_concurrency={self.max_concurrency}, "
|
||||||
|
f"poll_interval={self.poll_interval}s, breaker_threshold={self.breaker.threshold})"
|
||||||
|
)
|
||||||
|
while not self._stop.is_set():
|
||||||
|
try:
|
||||||
|
self._drain_once()
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Queue worker loop error: {e}")
|
||||||
|
self._stop.wait(self.poll_interval)
|
||||||
|
logger.info("Queue worker stopped")
|
||||||
|
|
||||||
|
def start(self):
|
||||||
|
if self._thread and self._thread.is_alive():
|
||||||
|
return
|
||||||
|
# Wire breaker alerting + launcher outcome callback.
|
||||||
|
try:
|
||||||
|
from .notifications import send_telegram
|
||||||
|
self.breaker.set_notifier(send_telegram)
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
launcher.on_outcome = self._on_outcome
|
||||||
|
self._stop.clear()
|
||||||
|
self._thread = threading.Thread(
|
||||||
|
target=self._run, name="queue-worker", daemon=True
|
||||||
|
)
|
||||||
|
self._thread.start()
|
||||||
|
|
||||||
|
def stop(self, timeout: float = 5.0):
|
||||||
|
self._stop.set()
|
||||||
|
if self._thread:
|
||||||
|
self._thread.join(timeout=timeout)
|
||||||
|
|
||||||
|
def status(self) -> dict:
|
||||||
|
"""Resilience snapshot for /queue."""
|
||||||
|
return {
|
||||||
|
"breaker": self.breaker.snapshot(),
|
||||||
|
"preflight_ok": self.last_preflight_ok,
|
||||||
|
"preflight_reason": self.last_preflight_reason,
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
# Module-level singleton used by the FastAPI lifespan.
|
||||||
|
worker = QueueWorker()
|
||||||
@@ -10,12 +10,13 @@ import httpx
|
|||||||
from fastapi import APIRouter, Request, HTTPException
|
from fastapi import APIRouter, Request, HTTPException
|
||||||
|
|
||||||
from ..config import settings
|
from ..config import settings
|
||||||
from ..db import get_db, get_task_by_repo_branch, update_task_stage
|
from ..db import get_db, get_task_by_repo_branch, update_task_stage, enqueue_job
|
||||||
from ..stages import get_next_stage, get_agent_for_stage
|
from ..stages import get_next_stage, get_agent_for_stage
|
||||||
from ..qg.checks import check_ci_green, check_review_approved
|
from ..qg.checks import check_ci_green, check_review_approved
|
||||||
from ..notifications import notify_stage_change, notify_qg_failure, notify_error
|
from ..notifications import notify_stage_change, notify_qg_failure, notify_error
|
||||||
from ..agents.launcher import launcher
|
from ..agents.launcher import launcher
|
||||||
from ..plane_sync import notify_stage_change as plane_notify_stage
|
from ..plane_sync import notify_stage_change as plane_notify_stage
|
||||||
|
from ..projects import get_project_by_repo
|
||||||
|
|
||||||
logger = logging.getLogger("orchestrator.webhooks.gitea")
|
logger = logging.getLogger("orchestrator.webhooks.gitea")
|
||||||
|
|
||||||
@@ -84,6 +85,11 @@ async def handle_push(payload: dict):
|
|||||||
|
|
||||||
repo_name = payload.get("repository", {}).get("name", settings.default_repo)
|
repo_name = payload.get("repository", {}).get("name", settings.default_repo)
|
||||||
|
|
||||||
|
# ORCH-6: ignore pushes to repos outside the project registry.
|
||||||
|
if not get_project_by_repo(repo_name):
|
||||||
|
logger.info(f"Gitea push: ignoring unknown repo '{repo_name}'")
|
||||||
|
return
|
||||||
|
|
||||||
task = get_task_by_repo_branch(repo_name, branch)
|
task = get_task_by_repo_branch(repo_name, branch)
|
||||||
if not task:
|
if not task:
|
||||||
logger.debug(f"Push to '{branch}' — no matching task found")
|
logger.debug(f"Push to '{branch}' — no matching task found")
|
||||||
@@ -117,8 +123,8 @@ async def handle_push(payload: dict):
|
|||||||
if agent:
|
if agent:
|
||||||
try:
|
try:
|
||||||
task_desc = f"Work item: {work_item_id}\nRepo: {repo_name}\nBranch: {branch}\nStage: {next_stage}"
|
task_desc = f"Work item: {work_item_id}\nRepo: {repo_name}\nBranch: {branch}\nStage: {next_stage}"
|
||||||
run_id = launcher.launch(agent, repo_name, task_desc, task_id=task_id)
|
job_id = enqueue_job(agent, repo_name, task_desc, task_id=task_id)
|
||||||
logger.info(f"Task {task_id}: push triggered {current_stage} → {next_stage}, launched '{agent}' (run_id={run_id})")
|
logger.info(f"Task {task_id}: push triggered {current_stage} → {next_stage}, enqueued '{agent}' (job_id={job_id})")
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
notify_error(task_id, f"Failed to launch agent '{agent}': {e}")
|
notify_error(task_id, f"Failed to launch agent '{agent}': {e}")
|
||||||
|
|
||||||
@@ -167,6 +173,12 @@ async def handle_ci_status(payload: dict):
|
|||||||
return
|
return
|
||||||
|
|
||||||
repo_name = payload.get("repository", {}).get("name", settings.default_repo)
|
repo_name = payload.get("repository", {}).get("name", settings.default_repo)
|
||||||
|
|
||||||
|
# ORCH-6: ignore CI status for repos outside the project registry.
|
||||||
|
if not get_project_by_repo(repo_name):
|
||||||
|
logger.info(f"Gitea CI status: ignoring unknown repo '{repo_name}'")
|
||||||
|
return
|
||||||
|
|
||||||
task = get_task_by_repo_branch(repo_name, branch)
|
task = get_task_by_repo_branch(repo_name, branch)
|
||||||
if not task:
|
if not task:
|
||||||
return
|
return
|
||||||
@@ -188,8 +200,8 @@ async def handle_ci_status(payload: dict):
|
|||||||
if agent:
|
if agent:
|
||||||
try:
|
try:
|
||||||
task_desc = f"Work item: {work_item_id}\nRepo: {repo_name}\nBranch: {branch}\nStage: {next_stage}"
|
task_desc = f"Work item: {work_item_id}\nRepo: {repo_name}\nBranch: {branch}\nStage: {next_stage}"
|
||||||
run_id = launcher.launch(agent, repo_name, task_desc, task_id=task_id)
|
job_id = enqueue_job(agent, repo_name, task_desc, task_id=task_id)
|
||||||
logger.info(f"Task {task_id}: CI green → {next_stage}, launched '{agent}' (run_id={run_id})")
|
logger.info(f"Task {task_id}: CI green → {next_stage}, enqueued '{agent}' (job_id={job_id})")
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
notify_error(task_id, f"Failed to launch agent '{agent}': {e}")
|
notify_error(task_id, f"Failed to launch agent '{agent}': {e}")
|
||||||
else:
|
else:
|
||||||
@@ -221,6 +233,11 @@ async def handle_pr(payload: dict):
|
|||||||
if not head_branch:
|
if not head_branch:
|
||||||
return
|
return
|
||||||
|
|
||||||
|
# ORCH-6: ignore PR events for repos outside the project registry.
|
||||||
|
if not get_project_by_repo(repo_name):
|
||||||
|
logger.info(f"Gitea PR: ignoring unknown repo '{repo_name}'")
|
||||||
|
return
|
||||||
|
|
||||||
task = get_task_by_repo_branch(repo_name, head_branch)
|
task = get_task_by_repo_branch(repo_name, head_branch)
|
||||||
if not task:
|
if not task:
|
||||||
logger.debug(f"PR event for branch '{head_branch}' — no matching task")
|
logger.debug(f"PR event for branch '{head_branch}' — no matching task")
|
||||||
@@ -255,8 +272,8 @@ async def handle_pr(payload: dict):
|
|||||||
if agent:
|
if agent:
|
||||||
try:
|
try:
|
||||||
task_desc = f"Work item: {work_item_id}\nRepo: {repo_name}\nBranch: {head_branch}\nStage: {next_stage}"
|
task_desc = f"Work item: {work_item_id}\nRepo: {repo_name}\nBranch: {head_branch}\nStage: {next_stage}"
|
||||||
run_id = launcher.launch(agent, repo_name, task_desc, task_id=task_id)
|
job_id = enqueue_job(agent, repo_name, task_desc, task_id=task_id)
|
||||||
logger.info(f"Task {task_id}: PR approved → {next_stage}, launched '{agent}' (run_id={run_id})")
|
logger.info(f"Task {task_id}: PR approved → {next_stage}, enqueued '{agent}' (job_id={job_id})")
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
notify_error(task_id, f"Failed to launch agent '{agent}': {e}")
|
notify_error(task_id, f"Failed to launch agent '{agent}': {e}")
|
||||||
else:
|
else:
|
||||||
@@ -280,8 +297,8 @@ async def handle_pr(payload: dict):
|
|||||||
f"Work item: {work_item_id}\nRepo: {repo_name}\nBranch: {head_branch}\n"
|
f"Work item: {work_item_id}\nRepo: {repo_name}\nBranch: {head_branch}\n"
|
||||||
f"Stage: development\nNote: Changes requested in review (attempt {retry_count + 1}/{MAX_DEV_RETRIES})"
|
f"Stage: development\nNote: Changes requested in review (attempt {retry_count + 1}/{MAX_DEV_RETRIES})"
|
||||||
)
|
)
|
||||||
run_id = launcher.launch("developer", repo_name, task_desc, task_id=task_id)
|
job_id = enqueue_job("developer", repo_name, task_desc, task_id=task_id)
|
||||||
logger.info(f"Task {task_id}: changes requested, relaunching developer (attempt {retry_count + 1})")
|
logger.info(f"Task {task_id}: changes requested, enqueued developer (attempt {retry_count + 1}, job_id={job_id})")
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
notify_error(task_id, f"Failed to relaunch developer: {e}")
|
notify_error(task_id, f"Failed to relaunch developer: {e}")
|
||||||
else:
|
else:
|
||||||
|
|||||||
@@ -14,6 +14,7 @@ from ..db import (
|
|||||||
get_task_by_plane_id,
|
get_task_by_plane_id,
|
||||||
get_next_work_item_id,
|
get_next_work_item_id,
|
||||||
update_task_stage,
|
update_task_stage,
|
||||||
|
enqueue_job,
|
||||||
)
|
)
|
||||||
from ..stages import get_next_stage, get_agent_for_stage, get_qg_for_stage, get_previous_stage
|
from ..stages import get_next_stage, get_agent_for_stage, get_qg_for_stage, get_previous_stage
|
||||||
from ..qg.checks import QG_CHECKS
|
from ..qg.checks import QG_CHECKS
|
||||||
@@ -24,6 +25,11 @@ from ..plane_sync import (
|
|||||||
notify_qg_failure as plane_notify_qg,
|
notify_qg_failure as plane_notify_qg,
|
||||||
notify_done as plane_notify_done,
|
notify_done as plane_notify_done,
|
||||||
)
|
)
|
||||||
|
from ..projects import (
|
||||||
|
get_project_by_plane_id,
|
||||||
|
get_project_by_repo,
|
||||||
|
known_plane_project_ids,
|
||||||
|
)
|
||||||
|
|
||||||
logger = logging.getLogger("orchestrator.webhooks.plane")
|
logger = logging.getLogger("orchestrator.webhooks.plane")
|
||||||
|
|
||||||
@@ -68,15 +74,26 @@ async def plane_webhook(request: Request):
|
|||||||
action = payload.get("action", "")
|
action = payload.get("action", "")
|
||||||
data = payload.get("data", {})
|
data = payload.get("data", {})
|
||||||
|
|
||||||
|
# ORCH-6: filter by Plane project. Ignore issues from unknown/unconfigured
|
||||||
|
# projects so a webhook on the whole workspace cannot funnel everything into
|
||||||
|
# the default repo (root cause of the 2026-06-02 incident).
|
||||||
|
project_id = data.get("project") or data.get("project_id") or ""
|
||||||
|
if project_id not in known_plane_project_ids():
|
||||||
|
logger.info(
|
||||||
|
f"Plane webhook: ignoring event '{event}' from unknown project "
|
||||||
|
f"'{project_id}' (known: {len(known_plane_project_ids())})"
|
||||||
|
)
|
||||||
|
return {"status": "ignored", "reason": "unknown project"}
|
||||||
|
|
||||||
if (event == "work_item.created") or (event == "issue" and action == "created"):
|
if (event == "work_item.created") or (event == "issue" and action == "created"):
|
||||||
await handle_work_item_created(data)
|
await handle_work_item_created(data, project_id)
|
||||||
elif (event == "comment.created") or (event == "issue_comment" and action == "created"):
|
elif (event == "comment.created") or (event == "issue_comment" and action == "created"):
|
||||||
await handle_comment(data)
|
await handle_comment(data, project_id)
|
||||||
|
|
||||||
return {"status": "accepted"}
|
return {"status": "accepted"}
|
||||||
|
|
||||||
|
|
||||||
async def handle_work_item_created(data: dict):
|
async def handle_work_item_created(data: dict, project_id: str = ""):
|
||||||
"""
|
"""
|
||||||
New work item created in Plane.
|
New work item created in Plane.
|
||||||
QG-0: validate title, description, priority.
|
QG-0: validate title, description, priority.
|
||||||
@@ -88,7 +105,17 @@ async def handle_work_item_created(data: dict):
|
|||||||
description = data.get("description_stripped", data.get("description", ""))
|
description = data.get("description_stripped", data.get("description", ""))
|
||||||
priority = data.get("priority", {})
|
priority = data.get("priority", {})
|
||||||
priority_name = priority if isinstance(priority, str) else priority.get("name", "")
|
priority_name = priority if isinstance(priority, str) else priority.get("name", "")
|
||||||
repo = settings.default_repo
|
|
||||||
|
# ORCH-6: resolve repo / prefix / Plane project from the registry instead of
|
||||||
|
# the single hardcoded default_repo.
|
||||||
|
if not project_id:
|
||||||
|
project_id = data.get("project") or data.get("project_id") or ""
|
||||||
|
proj = get_project_by_plane_id(project_id)
|
||||||
|
if not proj:
|
||||||
|
logger.warning(f"handle_work_item_created: unknown project '{project_id}', ignoring {plane_id}")
|
||||||
|
return
|
||||||
|
repo = proj.repo
|
||||||
|
plane_project_id = proj.plane_project_id
|
||||||
|
|
||||||
# QG-0 validation
|
# QG-0 validation
|
||||||
errors = []
|
errors = []
|
||||||
@@ -102,17 +129,17 @@ async def handle_work_item_created(data: dict):
|
|||||||
if errors:
|
if errors:
|
||||||
# QG-0 failed
|
# QG-0 failed
|
||||||
error_text = "\u26a0\ufe0f QG-0 failed:\n" + "\n".join(f"\u2022 {e}" for e in errors)
|
error_text = "\u26a0\ufe0f QG-0 failed:\n" + "\n".join(f"\u2022 {e}" for e in errors)
|
||||||
from ..plane_sync import PLANE_BASE, PLANE_HEADERS, WORKSPACE, PROJECT_ID, PLANE_STATES
|
from ..plane_sync import PLANE_BASE, PLANE_HEADERS, WORKSPACE, PLANE_STATES
|
||||||
import httpx as _httpx
|
import httpx as _httpx
|
||||||
# Post comment
|
# Post comment (ORCH-6: route to the issue's own project)
|
||||||
url = f"{PLANE_BASE}/workspaces/{WORKSPACE}/projects/{PROJECT_ID}/issues/{plane_id}/comments/"
|
url = f"{PLANE_BASE}/workspaces/{WORKSPACE}/projects/{plane_project_id}/issues/{plane_id}/comments/"
|
||||||
try:
|
try:
|
||||||
_httpx.post(url, headers=PLANE_HEADERS,
|
_httpx.post(url, headers=PLANE_HEADERS,
|
||||||
json={"comment_html": f"<p>{error_text}</p>"}, timeout=10)
|
json={"comment_html": f"<p>{error_text}</p>"}, timeout=10)
|
||||||
except Exception:
|
except Exception:
|
||||||
pass
|
pass
|
||||||
# Set blocked
|
# Set blocked
|
||||||
url2 = f"{PLANE_BASE}/workspaces/{WORKSPACE}/projects/{PROJECT_ID}/issues/{plane_id}/"
|
url2 = f"{PLANE_BASE}/workspaces/{WORKSPACE}/projects/{plane_project_id}/issues/{plane_id}/"
|
||||||
try:
|
try:
|
||||||
_httpx.patch(url2, headers=PLANE_HEADERS,
|
_httpx.patch(url2, headers=PLANE_HEADERS,
|
||||||
json={"state": PLANE_STATES["blocked"]}, timeout=10)
|
json={"state": PLANE_STATES["blocked"]}, timeout=10)
|
||||||
@@ -122,7 +149,7 @@ async def handle_work_item_created(data: dict):
|
|||||||
return
|
return
|
||||||
|
|
||||||
# Generate work item ID
|
# Generate work item ID
|
||||||
work_item_id = get_next_work_item_id(repo)
|
work_item_id = get_next_work_item_id(repo, proj.work_item_prefix)
|
||||||
|
|
||||||
# Create slug from name
|
# Create slug from name
|
||||||
slug = re.sub(r"[^a-z0-9]+", "-", name.lower()).strip("-")[:30]
|
slug = re.sub(r"[^a-z0-9]+", "-", name.lower()).strip("-")[:30]
|
||||||
@@ -160,8 +187,8 @@ async def handle_work_item_created(data: dict):
|
|||||||
if task_row:
|
if task_row:
|
||||||
task_id = task_row[0]
|
task_id = task_row[0]
|
||||||
task_desc = f"Work item: {work_item_id}\nRepo: {repo}\nBranch: {branch}\nStage: analysis\nTitle: {name}"
|
task_desc = f"Work item: {work_item_id}\nRepo: {repo}\nBranch: {branch}\nStage: analysis\nTitle: {name}"
|
||||||
run_id = launcher.launch("analyst", repo, task_desc, task_id=task_id)
|
job_id = enqueue_job("analyst", repo, task_desc, task_id=task_id)
|
||||||
logger.info(f"Task {task_id}: launched analyst (run_id={run_id})")
|
logger.info(f"Task {task_id}: enqueued analyst (job_id={job_id})")
|
||||||
# Post start comment to Plane
|
# Post start comment to Plane
|
||||||
from ..plane_sync import add_comment as _add_comment
|
from ..plane_sync import add_comment as _add_comment
|
||||||
_add_comment(work_item_id, "\U0001f50d Analyst \u0437\u0430\u043f\u0443\u0449\u0435\u043d. BRD/\u0422\u0417/AC/TestPlan \u0432 \u0440\u0430\u0431\u043e\u0442\u0435 (\u043e\u0436\u0438\u0434\u0430\u0439\u0442\u0435 8-15 \u043c\u0438\u043d).")
|
_add_comment(work_item_id, "\U0001f50d Analyst \u0437\u0430\u043f\u0443\u0449\u0435\u043d. BRD/\u0422\u0417/AC/TestPlan \u0432 \u0440\u0430\u0431\u043e\u0442\u0435 (\u043e\u0436\u0438\u0434\u0430\u0439\u0442\u0435 8-15 \u043c\u0438\u043d).")
|
||||||
@@ -169,7 +196,7 @@ async def handle_work_item_created(data: dict):
|
|||||||
logger.error(f"Failed to launch analyst for {work_item_id}: {e}")
|
logger.error(f"Failed to launch analyst for {work_item_id}: {e}")
|
||||||
|
|
||||||
|
|
||||||
async def handle_comment(data: dict):
|
async def handle_comment(data: dict, project_id: str = ""):
|
||||||
"""
|
"""
|
||||||
Handle comment event — check for :approved: or :rejected:.
|
Handle comment event — check for :approved: or :rejected:.
|
||||||
Advance or rollback stage accordingly.
|
Advance or rollback stage accordingly.
|
||||||
@@ -205,10 +232,10 @@ async def handle_comment(data: dict):
|
|||||||
f"Stage: analysis\nNote: Stakeholder REJECTED your artifacts. "
|
f"Stage: analysis\nNote: Stakeholder REJECTED your artifacts. "
|
||||||
f"Reason: {reason}\nRevise and improve."
|
f"Reason: {reason}\nRevise and improve."
|
||||||
)
|
)
|
||||||
new_run = launcher.launch("analyst", repo, task_desc, task_id=task_id)
|
new_job = enqueue_job("analyst", repo, task_desc, task_id=task_id)
|
||||||
from ..plane_sync import add_comment as _plane_comment
|
from ..plane_sync import add_comment as _plane_comment
|
||||||
_plane_comment(work_item_id, f"\U0001f504 Analyst \u043f\u0435\u0440\u0435\u0437\u0430\u043f\u0443\u0449\u0435\u043d. \u041f\u0440\u0438\u0447\u0438\u043d\u0430 \u043e\u0442\u043a\u043b\u043e\u043d\u0435\u043d\u0438\u044f: {reason}")
|
_plane_comment(work_item_id, f"\U0001f504 Analyst \u043f\u0435\u0440\u0435\u0437\u0430\u043f\u0443\u0449\u0435\u043d. \u041f\u0440\u0438\u0447\u0438\u043d\u0430 \u043e\u0442\u043a\u043b\u043e\u043d\u0435\u043d\u0438\u044f: {reason}")
|
||||||
logger.info(f"Task {task_id}: rejected at analysis, relaunched analyst")
|
logger.info(f"Task {task_id}: rejected at analysis, enqueued analyst (job_id={new_job})")
|
||||||
else:
|
else:
|
||||||
# Rollback to previous stage
|
# Rollback to previous stage
|
||||||
prev_stage = get_previous_stage(current_stage)
|
prev_stage = get_previous_stage(current_stage)
|
||||||
@@ -237,11 +264,15 @@ async def handle_comment(data: dict):
|
|||||||
if not issue_id:
|
if not issue_id:
|
||||||
issue_id = plane_id
|
issue_id = plane_id
|
||||||
if issue_id:
|
if issue_id:
|
||||||
from ..plane_sync import PLANE_BASE, PLANE_HEADERS, WORKSPACE, PROJECT_ID
|
from ..plane_sync import PLANE_BASE, PLANE_HEADERS, WORKSPACE
|
||||||
|
from ..plane_sync import PROJECT_ID as _DEFAULT_PROJECT_ID
|
||||||
|
# ORCH-6: route to this task's own Plane project (resolved from repo).
|
||||||
|
_proj = get_project_by_repo(repo)
|
||||||
|
_pid = _proj.plane_project_id if _proj else (project_id or _DEFAULT_PROJECT_ID)
|
||||||
import httpx as _httpx
|
import httpx as _httpx
|
||||||
try:
|
try:
|
||||||
_resp = _httpx.get(
|
_resp = _httpx.get(
|
||||||
f"{PLANE_BASE}/workspaces/{WORKSPACE}/projects/{PROJECT_ID}/issues/{issue_id}/",
|
f"{PLANE_BASE}/workspaces/{WORKSPACE}/projects/{_pid}/issues/{issue_id}/",
|
||||||
headers=PLANE_HEADERS, timeout=10
|
headers=PLANE_HEADERS, timeout=10
|
||||||
)
|
)
|
||||||
if _resp.status_code == 200:
|
if _resp.status_code == 200:
|
||||||
@@ -275,10 +306,10 @@ async def handle_comment(data: dict):
|
|||||||
f"Read the latest comment in Plane and revise your artifacts.\n"
|
f"Read the latest comment in Plane and revise your artifacts.\n"
|
||||||
f"Answer: {comment_body[:500]}"
|
f"Answer: {comment_body[:500]}"
|
||||||
)
|
)
|
||||||
new_run = launcher.launch("analyst", repo, task_desc, task_id=task_id)
|
new_job = enqueue_job("analyst", repo, task_desc, task_id=task_id)
|
||||||
from ..plane_sync import add_comment as _pc2
|
from ..plane_sync import add_comment as _pc2
|
||||||
_pc2(work_item_id, "\U0001f504 Analyst \u043f\u0435\u0440\u0435\u0437\u0430\u043f\u0443\u0449\u0435\u043d \u0441 \u043e\u0442\u0432\u0435\u0442\u0430\u043c\u0438 \u0441\u0442\u0435\u0439\u043a\u0445\u043e\u043b\u0434\u0435\u0440\u0430.")
|
_pc2(work_item_id, "\U0001f504 Analyst \u043f\u0435\u0440\u0435\u0437\u0430\u043f\u0443\u0449\u0435\u043d \u0441 \u043e\u0442\u0432\u0435\u0442\u0430\u043c\u0438 \u0441\u0442\u0435\u0439\u043a\u0445\u043e\u043b\u0434\u0435\u0440\u0430.")
|
||||||
logger.info(f"Task {task_id}: stakeholder answered questions, relaunched analyst (run_id={new_run})")
|
logger.info(f"Task {task_id}: stakeholder answered questions, enqueued analyst (job_id={new_job})")
|
||||||
return
|
return
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f"Failed to check issue state: {e}")
|
logger.error(f"Failed to check issue state: {e}")
|
||||||
@@ -356,9 +387,9 @@ async def _try_advance_stage(
|
|||||||
if agent:
|
if agent:
|
||||||
try:
|
try:
|
||||||
task_desc = f"Work item: {work_item_id}\nRepo: {repo}\nBranch: {branch}\nStage: {next_stage}"
|
task_desc = f"Work item: {work_item_id}\nRepo: {repo}\nBranch: {branch}\nStage: {next_stage}"
|
||||||
run_id = launcher.launch(agent, repo, task_desc, task_id=task_id)
|
job_id = enqueue_job(agent, repo, task_desc, task_id=task_id)
|
||||||
plane_notify_stage(work_item_id, current_stage, next_stage, agent)
|
plane_notify_stage(work_item_id, current_stage, next_stage, agent)
|
||||||
logger.info(f"Task {task_id}: launched agent '{agent}', run_id={run_id}")
|
logger.info(f"Task {task_id}: enqueued agent '{agent}', job_id={job_id}")
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
notify_error(task_id, f"Failed to launch agent '{agent}': {e}")
|
notify_error(task_id, f"Failed to launch agent '{agent}': {e}")
|
||||||
logger.error(f"Agent launch failed: {e}")
|
logger.error(f"Agent launch failed: {e}")
|
||||||
|
|||||||
180
tests/test_plane_webhook.py
Normal file
180
tests/test_plane_webhook.py
Normal file
@@ -0,0 +1,180 @@
|
|||||||
|
"""ORCH-6: Plane webhook project-filter + repo-resolution tests.
|
||||||
|
|
||||||
|
Verifies the core of the 2026-06-02 incident fix:
|
||||||
|
* webhook from an UNKNOWN Plane project -> {"status": "ignored"} and no task
|
||||||
|
* webhook from the orchestrator project -> task created with repo=orchestrator
|
||||||
|
* webhook from the enduro project -> task created with repo=enduro-trails
|
||||||
|
|
||||||
|
launcher.launch is mocked so no real agents are spawned. Gitea branch/doc
|
||||||
|
creation is mocked (network). FastAPI TestClient drives the real endpoint.
|
||||||
|
|
||||||
|
This module configures its own registry via monkeypatch + reload_projects so it
|
||||||
|
is independent of ORCH_PROJECTS_JSON set by other test modules.
|
||||||
|
"""
|
||||||
|
|
||||||
|
import os
|
||||||
|
import tempfile
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
# Test DB / disable signature checks (same convention as test_webhooks.py).
|
||||||
|
_test_db = os.path.join(tempfile.gettempdir(), "test_orchestrator_plane.db")
|
||||||
|
os.environ["ORCH_DB_PATH"] = _test_db
|
||||||
|
os.environ.setdefault("ORCH_PLANE_WEBHOOK_SECRET", "")
|
||||||
|
os.environ.setdefault("ORCH_GITEA_WEBHOOK_SECRET", "")
|
||||||
|
os.environ.setdefault("ORCH_GITEA_TOKEN", "test-token")
|
||||||
|
os.environ.setdefault("ORCH_PLANE_API_TOKEN", "test-token")
|
||||||
|
|
||||||
|
from unittest.mock import patch, AsyncMock # noqa: E402
|
||||||
|
|
||||||
|
from fastapi.testclient import TestClient # noqa: E402
|
||||||
|
|
||||||
|
from src.main import app # noqa: E402
|
||||||
|
from src.db import init_db, get_db # noqa: E402
|
||||||
|
from src import projects as P # noqa: E402
|
||||||
|
from src.projects import reload_projects # noqa: E402
|
||||||
|
|
||||||
|
ORCH_PLANE_ID = "8da6aa25-a60e-44d6-a1e2-d8ae59aa7d6a"
|
||||||
|
ENDURO_PLANE_ID = "7a79f0a9-5278-49cd-9007-9a338f238f9c"
|
||||||
|
UNKNOWN_PLANE_ID = "deadbeef-0000-0000-0000-000000000000"
|
||||||
|
|
||||||
|
client = TestClient(app)
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture(autouse=True)
|
||||||
|
def setup(monkeypatch):
|
||||||
|
"""Fresh DB + a known two-project registry for each test."""
|
||||||
|
# settings.db_path is resolved once at import; force it to our isolated DB so
|
||||||
|
# this suite is independent of whichever test module imported config first.
|
||||||
|
monkeypatch.setattr(P.settings, "db_path", _test_db)
|
||||||
|
import src.db as _db
|
||||||
|
monkeypatch.setattr(_db.settings, "db_path", _test_db)
|
||||||
|
if os.path.exists(_test_db):
|
||||||
|
os.unlink(_test_db)
|
||||||
|
init_db()
|
||||||
|
|
||||||
|
# The webhook signature secret may be baked into the runtime env; this suite
|
||||||
|
# focuses on the project filter, so bypass signature verification.
|
||||||
|
monkeypatch.setattr("src.webhooks.plane.verify_plane_signature", lambda body, sig: True)
|
||||||
|
|
||||||
|
registry_json = (
|
||||||
|
f'[{{"plane_project_id": "{ENDURO_PLANE_ID}", "repo": "enduro-trails",'
|
||||||
|
f' "work_item_prefix": "ET", "name": "enduro-trails"}},'
|
||||||
|
f' {{"plane_project_id": "{ORCH_PLANE_ID}", "repo": "orchestrator",'
|
||||||
|
f' "work_item_prefix": "ORCH", "name": "orchestrator"}}]'
|
||||||
|
)
|
||||||
|
monkeypatch.setattr(P.settings, "projects_json", registry_json)
|
||||||
|
reload_projects()
|
||||||
|
|
||||||
|
yield
|
||||||
|
|
||||||
|
reload_projects() # restore from env
|
||||||
|
if os.path.exists(_test_db):
|
||||||
|
os.unlink(_test_db)
|
||||||
|
|
||||||
|
|
||||||
|
def _post_created(plane_project_id, plane_id="wi-1", name="A valid work item title"):
|
||||||
|
return client.post(
|
||||||
|
"/webhook/plane",
|
||||||
|
json={
|
||||||
|
"event": "work_item.created",
|
||||||
|
"data": {
|
||||||
|
"id": plane_id,
|
||||||
|
"name": name,
|
||||||
|
"description_stripped": "This is a sufficiently long description.",
|
||||||
|
"project": plane_project_id,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# Filter: unknown project is ignored, no side effects
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
@patch("src.webhooks.plane.launcher")
|
||||||
|
@patch("src.webhooks.plane._create_initial_docs", new_callable=AsyncMock)
|
||||||
|
@patch("src.webhooks.plane._create_gitea_branch", new_callable=AsyncMock)
|
||||||
|
def test_unknown_project_ignored(mock_branch, mock_docs, mock_launcher):
|
||||||
|
resp = _post_created(UNKNOWN_PLANE_ID, plane_id="ignore-me")
|
||||||
|
assert resp.status_code == 200
|
||||||
|
assert resp.json()["status"] == "ignored"
|
||||||
|
assert resp.json().get("reason") == "unknown project"
|
||||||
|
|
||||||
|
# No task, no branch, no agent.
|
||||||
|
conn = get_db()
|
||||||
|
task = conn.execute("SELECT * FROM tasks WHERE plane_id='ignore-me'").fetchone()
|
||||||
|
conn.close()
|
||||||
|
assert task is None
|
||||||
|
mock_branch.assert_not_called()
|
||||||
|
mock_launcher.launch.assert_not_called()
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# orchestrator project -> repo=orchestrator, prefix ORCH
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
@patch("src.webhooks.plane.launcher")
|
||||||
|
@patch("src.webhooks.plane._create_initial_docs", new_callable=AsyncMock)
|
||||||
|
@patch("src.webhooks.plane._create_gitea_branch", new_callable=AsyncMock)
|
||||||
|
def test_orchestrator_project_routes_to_orchestrator_repo(mock_branch, mock_docs, mock_launcher):
|
||||||
|
mock_launcher.launch.return_value = 1
|
||||||
|
resp = _post_created(ORCH_PLANE_ID, plane_id="orch-1")
|
||||||
|
assert resp.status_code == 200
|
||||||
|
assert resp.json()["status"] == "accepted"
|
||||||
|
|
||||||
|
conn = get_db()
|
||||||
|
task = conn.execute("SELECT * FROM tasks WHERE plane_id='orch-1'").fetchone()
|
||||||
|
conn.close()
|
||||||
|
assert task is not None
|
||||||
|
assert task["repo"] == "orchestrator"
|
||||||
|
assert task["work_item_id"].startswith("ORCH-")
|
||||||
|
assert task["stage"] == "analysis"
|
||||||
|
# Branch created against the orchestrator repo.
|
||||||
|
args = mock_branch.call_args.args
|
||||||
|
assert args[0] == "orchestrator"
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# enduro project -> repo=enduro-trails, prefix ET
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
@patch("src.webhooks.plane.launcher")
|
||||||
|
@patch("src.webhooks.plane._create_initial_docs", new_callable=AsyncMock)
|
||||||
|
@patch("src.webhooks.plane._create_gitea_branch", new_callable=AsyncMock)
|
||||||
|
def test_enduro_project_routes_to_enduro_repo(mock_branch, mock_docs, mock_launcher):
|
||||||
|
mock_launcher.launch.return_value = 1
|
||||||
|
resp = _post_created(ENDURO_PLANE_ID, plane_id="et-1")
|
||||||
|
assert resp.status_code == 200
|
||||||
|
assert resp.json()["status"] == "accepted"
|
||||||
|
|
||||||
|
conn = get_db()
|
||||||
|
task = conn.execute("SELECT * FROM tasks WHERE plane_id='et-1'").fetchone()
|
||||||
|
conn.close()
|
||||||
|
assert task is not None
|
||||||
|
assert task["repo"] == "enduro-trails"
|
||||||
|
assert task["work_item_id"].startswith("ET-")
|
||||||
|
args = mock_branch.call_args.args
|
||||||
|
assert args[0] == "enduro-trails"
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# prefixes are independent per repo (ORCH-001 vs ET-001 in parallel)
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
@patch("src.webhooks.plane.launcher")
|
||||||
|
@patch("src.webhooks.plane._create_initial_docs", new_callable=AsyncMock)
|
||||||
|
@patch("src.webhooks.plane._create_gitea_branch", new_callable=AsyncMock)
|
||||||
|
def test_prefixes_independent_per_project(mock_branch, mock_docs, mock_launcher):
|
||||||
|
mock_launcher.launch.return_value = 1
|
||||||
|
_post_created(ORCH_PLANE_ID, plane_id="o1", name="Orchestrator item one")
|
||||||
|
_post_created(ENDURO_PLANE_ID, plane_id="e1", name="Enduro item one")
|
||||||
|
_post_created(ORCH_PLANE_ID, plane_id="o2", name="Orchestrator item two")
|
||||||
|
|
||||||
|
conn = get_db()
|
||||||
|
rows = {r["plane_id"]: r["work_item_id"] for r in
|
||||||
|
conn.execute("SELECT plane_id, work_item_id FROM tasks").fetchall()}
|
||||||
|
conn.close()
|
||||||
|
assert rows["o1"] == "ORCH-001"
|
||||||
|
assert rows["o2"] == "ORCH-002"
|
||||||
|
assert rows["e1"] == "ET-001"
|
||||||
177
tests/test_projects.py
Normal file
177
tests/test_projects.py
Normal file
@@ -0,0 +1,177 @@
|
|||||||
|
"""ORCH-6: tests for the project registry (src/projects.py).
|
||||||
|
|
||||||
|
Covers resolvers (by plane_id, by repo, unknown -> None, known ids) against the
|
||||||
|
built-in default registry, plus ORCH_PROJECTS_JSON parsing (valid + malformed
|
||||||
|
-> default fallback).
|
||||||
|
|
||||||
|
The pure parser ``_parse_projects_json`` is tested directly so we don't mutate
|
||||||
|
the module-global registry. Resolver tests run against the default registry; if
|
||||||
|
another test (e.g. test_webhooks) set ORCH_PROJECTS_JSON in the env, we restore
|
||||||
|
the default via monkeypatch + reload_projects to keep this file order-independent.
|
||||||
|
"""
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
from src import projects as P
|
||||||
|
from src.projects import (
|
||||||
|
ProjectConfig,
|
||||||
|
get_project_by_plane_id,
|
||||||
|
get_project_by_repo,
|
||||||
|
known_plane_project_ids,
|
||||||
|
reload_projects,
|
||||||
|
_parse_projects_json,
|
||||||
|
_DEFAULT_PROJECTS,
|
||||||
|
)
|
||||||
|
|
||||||
|
# Known ids from the default registry / task spec.
|
||||||
|
ENDURO_PLANE_ID = "7a79f0a9-5278-49cd-9007-9a338f238f9c"
|
||||||
|
ORCH_PLANE_ID = "8da6aa25-a60e-44d6-a1e2-d8ae59aa7d6a"
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def default_registry(monkeypatch):
|
||||||
|
"""Force the default (built-in) registry regardless of ORCH_PROJECTS_JSON
|
||||||
|
that other test modules may have set in the process env."""
|
||||||
|
monkeypatch.setattr(P.settings, "projects_json", "")
|
||||||
|
reload_projects()
|
||||||
|
yield
|
||||||
|
# Restore from current settings (whatever env says) after the test.
|
||||||
|
reload_projects()
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# Resolvers
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
def test_get_project_by_plane_id_orchestrator(default_registry):
|
||||||
|
proj = get_project_by_plane_id(ORCH_PLANE_ID)
|
||||||
|
assert proj is not None
|
||||||
|
assert proj.repo == "orchestrator"
|
||||||
|
assert proj.work_item_prefix == "ORCH"
|
||||||
|
assert proj.plane_project_id == ORCH_PLANE_ID
|
||||||
|
|
||||||
|
|
||||||
|
def test_get_project_by_plane_id_enduro(default_registry):
|
||||||
|
proj = get_project_by_plane_id(ENDURO_PLANE_ID)
|
||||||
|
assert proj is not None
|
||||||
|
assert proj.repo == "enduro-trails"
|
||||||
|
assert proj.work_item_prefix == "ET"
|
||||||
|
|
||||||
|
|
||||||
|
def test_get_project_by_plane_id_unknown_returns_none(default_registry):
|
||||||
|
assert get_project_by_plane_id("00000000-0000-0000-0000-000000000000") is None
|
||||||
|
|
||||||
|
|
||||||
|
def test_get_project_by_plane_id_empty_returns_none(default_registry):
|
||||||
|
assert get_project_by_plane_id("") is None
|
||||||
|
assert get_project_by_plane_id(None) is None
|
||||||
|
|
||||||
|
|
||||||
|
def test_get_project_by_repo(default_registry):
|
||||||
|
assert get_project_by_repo("enduro-trails").work_item_prefix == "ET"
|
||||||
|
assert get_project_by_repo("orchestrator").work_item_prefix == "ORCH"
|
||||||
|
|
||||||
|
|
||||||
|
def test_get_project_by_repo_unknown_returns_none(default_registry):
|
||||||
|
assert get_project_by_repo("does-not-exist") is None
|
||||||
|
assert get_project_by_repo("") is None
|
||||||
|
assert get_project_by_repo(None) is None
|
||||||
|
|
||||||
|
|
||||||
|
def test_known_plane_project_ids(default_registry):
|
||||||
|
ids = known_plane_project_ids()
|
||||||
|
assert isinstance(ids, set)
|
||||||
|
assert ENDURO_PLANE_ID in ids
|
||||||
|
assert ORCH_PLANE_ID in ids
|
||||||
|
assert len(ids) == len(_DEFAULT_PROJECTS)
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# ORCH_PROJECTS_JSON parsing (pure function, no global mutation)
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
def test_parse_empty_returns_none():
|
||||||
|
assert _parse_projects_json("") is None
|
||||||
|
assert _parse_projects_json(" ") is None
|
||||||
|
assert _parse_projects_json(None) is None
|
||||||
|
|
||||||
|
|
||||||
|
def test_parse_valid_json():
|
||||||
|
raw = (
|
||||||
|
'[{"plane_project_id": "p-1", "repo": "repo-a", '
|
||||||
|
'"work_item_prefix": "AAA", "name": "Alpha"}]'
|
||||||
|
)
|
||||||
|
parsed = _parse_projects_json(raw)
|
||||||
|
assert parsed is not None
|
||||||
|
assert len(parsed) == 1
|
||||||
|
assert isinstance(parsed[0], ProjectConfig)
|
||||||
|
assert parsed[0].plane_project_id == "p-1"
|
||||||
|
assert parsed[0].repo == "repo-a"
|
||||||
|
assert parsed[0].work_item_prefix == "AAA"
|
||||||
|
assert parsed[0].name == "Alpha"
|
||||||
|
|
||||||
|
|
||||||
|
def test_parse_valid_json_multiple():
|
||||||
|
raw = (
|
||||||
|
'[{"plane_project_id": "p-1", "repo": "repo-a", "work_item_prefix": "A"},'
|
||||||
|
' {"plane_project_id": "p-2", "repo": "repo-b", "work_item_prefix": "B"}]'
|
||||||
|
)
|
||||||
|
parsed = _parse_projects_json(raw)
|
||||||
|
assert len(parsed) == 2
|
||||||
|
# name defaults to repo when omitted
|
||||||
|
assert parsed[0].name == "repo-a"
|
||||||
|
assert parsed[1].repo == "repo-b"
|
||||||
|
|
||||||
|
|
||||||
|
def test_parse_malformed_json_returns_none():
|
||||||
|
assert _parse_projects_json("{not valid json") is None
|
||||||
|
assert _parse_projects_json("[}") is None
|
||||||
|
|
||||||
|
|
||||||
|
def test_parse_not_an_array_returns_none():
|
||||||
|
# A JSON object (not array) is invalid -> fallback.
|
||||||
|
assert _parse_projects_json('{"plane_project_id": "p-1"}') is None
|
||||||
|
|
||||||
|
|
||||||
|
def test_parse_skips_bad_entries_keeps_good():
|
||||||
|
raw = (
|
||||||
|
'[{"repo": "missing-id"},' # missing required key -> skipped
|
||||||
|
' {"plane_project_id": "p-2", "repo": "repo-b", "work_item_prefix": "B"}]'
|
||||||
|
)
|
||||||
|
parsed = _parse_projects_json(raw)
|
||||||
|
assert parsed is not None
|
||||||
|
assert len(parsed) == 1
|
||||||
|
assert parsed[0].plane_project_id == "p-2"
|
||||||
|
|
||||||
|
|
||||||
|
def test_parse_all_bad_entries_returns_none():
|
||||||
|
# No valid entries -> None (fallback to default).
|
||||||
|
assert _parse_projects_json('[{"repo": "no-id"}, "not-an-object"]') is None
|
||||||
|
|
||||||
|
|
||||||
|
def test_reload_from_custom_json(monkeypatch):
|
||||||
|
"""End-to-end: set settings.projects_json, reload, resolvers reflect it."""
|
||||||
|
custom = (
|
||||||
|
'[{"plane_project_id": "custom-uuid", "repo": "custom-repo", '
|
||||||
|
'"work_item_prefix": "CUS", "name": "Custom"}]'
|
||||||
|
)
|
||||||
|
monkeypatch.setattr(P.settings, "projects_json", custom)
|
||||||
|
reload_projects()
|
||||||
|
try:
|
||||||
|
assert get_project_by_plane_id("custom-uuid").repo == "custom-repo"
|
||||||
|
assert get_project_by_repo("custom-repo").work_item_prefix == "CUS"
|
||||||
|
assert known_plane_project_ids() == {"custom-uuid"}
|
||||||
|
# The built-in defaults must NOT be present when JSON overrides.
|
||||||
|
assert get_project_by_plane_id(ENDURO_PLANE_ID) is None
|
||||||
|
finally:
|
||||||
|
reload_projects()
|
||||||
|
|
||||||
|
|
||||||
|
def test_reload_invalid_json_falls_back_to_default(monkeypatch):
|
||||||
|
monkeypatch.setattr(P.settings, "projects_json", "{garbage")
|
||||||
|
reload_projects()
|
||||||
|
try:
|
||||||
|
assert get_project_by_plane_id(ENDURO_PLANE_ID) is not None
|
||||||
|
assert get_project_by_plane_id(ORCH_PLANE_ID) is not None
|
||||||
|
finally:
|
||||||
|
reload_projects()
|
||||||
304
tests/test_queue.py
Normal file
304
tests/test_queue.py
Normal file
@@ -0,0 +1,304 @@
|
|||||||
|
"""Tests for ORCH-1 (F-2b) persistent job queue.
|
||||||
|
|
||||||
|
Covers:
|
||||||
|
- enqueue_job -> claim_next_job -> mark_job lifecycle
|
||||||
|
- claim_next_job atomicity (no double-dispatch of the same job)
|
||||||
|
- retry: fail -> requeue while attempts < max_attempts, then failed
|
||||||
|
- requeue_running_jobs (queue-recovery)
|
||||||
|
- count_running_jobs / job_status_counts / recent_jobs
|
||||||
|
- QueueWorker respects max_concurrency (Popen / launch fully mocked)
|
||||||
|
|
||||||
|
The real claude/Popen is NEVER spawned: launcher.launch_job is mocked in worker
|
||||||
|
tests, and the launcher finalize logic is exercised directly via mark_job.
|
||||||
|
"""
|
||||||
|
import os
|
||||||
|
import tempfile
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
# Override env before importing app modules (same convention as test_qg.py).
|
||||||
|
_test_db = os.path.join(tempfile.gettempdir(), "test_orchestrator_queue.db")
|
||||||
|
os.environ["ORCH_DB_PATH"] = _test_db
|
||||||
|
os.environ["ORCH_REPOS_DIR"] = tempfile.gettempdir()
|
||||||
|
os.environ["ORCH_GITEA_TOKEN"] = "test-token"
|
||||||
|
os.environ["ORCH_PLANE_API_TOKEN"] = "test-token"
|
||||||
|
|
||||||
|
import src.db as db
|
||||||
|
from src.db import (
|
||||||
|
init_db,
|
||||||
|
enqueue_job,
|
||||||
|
claim_next_job,
|
||||||
|
mark_job,
|
||||||
|
count_running_jobs,
|
||||||
|
requeue_running_jobs,
|
||||||
|
get_job,
|
||||||
|
job_status_counts,
|
||||||
|
recent_jobs,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture(autouse=True)
|
||||||
|
def fresh_db(tmp_path, monkeypatch):
|
||||||
|
"""Point the DB at a fresh per-test sqlite file and init the schema."""
|
||||||
|
dbfile = tmp_path / "queue.db"
|
||||||
|
monkeypatch.setattr(db.settings, "db_path", str(dbfile))
|
||||||
|
init_db()
|
||||||
|
yield
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# enqueue / claim / mark lifecycle
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
class TestLifecycle:
|
||||||
|
def test_enqueue_creates_queued_job(self):
|
||||||
|
jid = enqueue_job("analyst", "enduro-trails", "task body", task_id=7)
|
||||||
|
job = get_job(jid)
|
||||||
|
assert job["status"] == "queued"
|
||||||
|
assert job["agent"] == "analyst"
|
||||||
|
assert job["repo"] == "enduro-trails"
|
||||||
|
assert job["task_content"] == "task body"
|
||||||
|
assert job["task_id"] == 7
|
||||||
|
assert job["attempts"] == 0
|
||||||
|
assert job["max_attempts"] == 2
|
||||||
|
|
||||||
|
def test_claim_marks_running_and_increments_attempts(self):
|
||||||
|
jid = enqueue_job("developer", "repo")
|
||||||
|
claimed = claim_next_job()
|
||||||
|
assert claimed is not None
|
||||||
|
assert claimed["id"] == jid
|
||||||
|
assert claimed["status"] == "running"
|
||||||
|
assert claimed["attempts"] == 1
|
||||||
|
assert count_running_jobs() == 1
|
||||||
|
|
||||||
|
def test_claim_empty_queue_returns_none(self):
|
||||||
|
assert claim_next_job() is None
|
||||||
|
|
||||||
|
def test_claim_is_fifo(self):
|
||||||
|
a = enqueue_job("analyst", "r")
|
||||||
|
b = enqueue_job("developer", "r")
|
||||||
|
assert claim_next_job()["id"] == a
|
||||||
|
assert claim_next_job()["id"] == b
|
||||||
|
|
||||||
|
def test_mark_done(self):
|
||||||
|
jid = enqueue_job("tester", "r")
|
||||||
|
claim_next_job()
|
||||||
|
mark_job(jid, "done", run_id=42)
|
||||||
|
job = get_job(jid)
|
||||||
|
assert job["status"] == "done"
|
||||||
|
assert job["run_id"] == 42
|
||||||
|
assert job["finished_at"] is not None
|
||||||
|
assert count_running_jobs() == 0
|
||||||
|
|
||||||
|
def test_mark_failed_records_error(self):
|
||||||
|
jid = enqueue_job("tester", "r")
|
||||||
|
claim_next_job()
|
||||||
|
mark_job(jid, "failed", run_id=9, error="boom")
|
||||||
|
job = get_job(jid)
|
||||||
|
assert job["status"] == "failed"
|
||||||
|
assert job["error"] == "boom"
|
||||||
|
assert job["finished_at"] is not None
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# claim atomicity — no double dispatch
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
class TestClaimAtomicity:
|
||||||
|
def test_single_job_claimed_once(self):
|
||||||
|
jid = enqueue_job("analyst", "r")
|
||||||
|
first = claim_next_job()
|
||||||
|
second = claim_next_job()
|
||||||
|
assert first["id"] == jid
|
||||||
|
assert second is None # already running, not re-dispatched
|
||||||
|
|
||||||
|
def test_concurrent_claims_no_duplicate(self):
|
||||||
|
"""Many enqueued jobs claimed from parallel threads -> each claimed once."""
|
||||||
|
import threading
|
||||||
|
|
||||||
|
n = 20
|
||||||
|
for _ in range(n):
|
||||||
|
enqueue_job("developer", "r")
|
||||||
|
|
||||||
|
claimed_ids = []
|
||||||
|
lock = threading.Lock()
|
||||||
|
|
||||||
|
def grab():
|
||||||
|
while True:
|
||||||
|
job = claim_next_job()
|
||||||
|
if job is None:
|
||||||
|
return
|
||||||
|
with lock:
|
||||||
|
claimed_ids.append(job["id"])
|
||||||
|
|
||||||
|
threads = [threading.Thread(target=grab) for _ in range(8)]
|
||||||
|
for t in threads:
|
||||||
|
t.start()
|
||||||
|
for t in threads:
|
||||||
|
t.join()
|
||||||
|
|
||||||
|
assert len(claimed_ids) == n
|
||||||
|
assert len(set(claimed_ids)) == n # no id claimed twice
|
||||||
|
assert count_running_jobs() == n
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# retry semantics (mirrors launcher._finalize_job logic)
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
class TestRetry:
|
||||||
|
def test_fail_requeues_while_under_max(self):
|
||||||
|
jid = enqueue_job("developer", "r", max_attempts=2)
|
||||||
|
job = claim_next_job() # attempts=1
|
||||||
|
assert job["attempts"] == 1
|
||||||
|
# attempts(1) < max(2) -> requeue
|
||||||
|
mark_job(jid, "queued", error="exit 1")
|
||||||
|
j = get_job(jid)
|
||||||
|
assert j["status"] == "queued"
|
||||||
|
assert j["error"] == "exit 1"
|
||||||
|
assert j["started_at"] is None # requeue clears started_at
|
||||||
|
|
||||||
|
def test_fail_fails_when_max_reached(self):
|
||||||
|
jid = enqueue_job("developer", "r", max_attempts=2)
|
||||||
|
claim_next_job() # attempts=1 -> requeue
|
||||||
|
mark_job(jid, "queued")
|
||||||
|
job2 = claim_next_job() # attempts=2
|
||||||
|
assert job2["attempts"] == 2
|
||||||
|
# attempts(2) >= max(2) -> failed
|
||||||
|
mark_job(jid, "failed", error="exit 1")
|
||||||
|
assert get_job(jid)["status"] == "failed"
|
||||||
|
|
||||||
|
def test_finalize_job_done(self):
|
||||||
|
"""launcher._finalize_job marks done on exit_code 0 (no Popen needed)."""
|
||||||
|
from src.agents.launcher import AgentLauncher
|
||||||
|
jid = enqueue_job("analyst", "r")
|
||||||
|
claim_next_job()
|
||||||
|
AgentLauncher()._finalize_job(jid, "analyst", run_id=5, exit_code=0)
|
||||||
|
assert get_job(jid)["status"] == "done"
|
||||||
|
|
||||||
|
def test_finalize_job_requeue_then_fail(self, monkeypatch):
|
||||||
|
from src.agents.launcher import AgentLauncher
|
||||||
|
# Silence telegram side-effect.
|
||||||
|
monkeypatch.setattr("src.notifications.send_telegram", lambda *a, **k: None)
|
||||||
|
lr = AgentLauncher()
|
||||||
|
jid = enqueue_job("developer", "r", max_attempts=2)
|
||||||
|
|
||||||
|
claim_next_job() # attempts=1
|
||||||
|
lr._finalize_job(jid, "developer", run_id=1, exit_code=2)
|
||||||
|
assert get_job(jid)["status"] == "queued" # 1 < 2 -> requeue
|
||||||
|
|
||||||
|
claim_next_job() # attempts=2
|
||||||
|
lr._finalize_job(jid, "developer", run_id=2, exit_code=2)
|
||||||
|
assert get_job(jid)["status"] == "failed" # 2 >= 2 -> failed
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# queue-recovery
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
class TestRequeueRunning:
|
||||||
|
def test_requeue_running_jobs(self):
|
||||||
|
a = enqueue_job("analyst", "r")
|
||||||
|
b = enqueue_job("developer", "r")
|
||||||
|
claim_next_job() # a -> running
|
||||||
|
claim_next_job() # b -> running
|
||||||
|
assert count_running_jobs() == 2
|
||||||
|
n = requeue_running_jobs()
|
||||||
|
assert n == 2
|
||||||
|
assert count_running_jobs() == 0
|
||||||
|
assert get_job(a)["status"] == "queued"
|
||||||
|
assert get_job(b)["status"] == "queued"
|
||||||
|
|
||||||
|
def test_requeue_preserves_attempts(self):
|
||||||
|
jid = enqueue_job("analyst", "r")
|
||||||
|
claim_next_job() # attempts=1
|
||||||
|
requeue_running_jobs()
|
||||||
|
assert get_job(jid)["attempts"] == 1 # not reset
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# observability helpers
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
class TestObservability:
|
||||||
|
def test_status_counts(self):
|
||||||
|
enqueue_job("analyst", "r") # stays queued
|
||||||
|
enqueue_job("developer", "r") # first claimed -> running (FIFO)
|
||||||
|
claim_next_job()
|
||||||
|
counts = job_status_counts()
|
||||||
|
assert counts["running"] == 1
|
||||||
|
assert counts["queued"] == 1
|
||||||
|
assert counts["done"] == 0
|
||||||
|
assert counts["failed"] == 0
|
||||||
|
|
||||||
|
def test_recent_jobs_desc(self):
|
||||||
|
ids = [enqueue_job("analyst", "r") for _ in range(3)]
|
||||||
|
recent = recent_jobs(10)
|
||||||
|
assert [r["id"] for r in recent] == sorted(ids, reverse=True)
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# QueueWorker max_concurrency (launch_job fully mocked — no real Popen)
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
class TestWorkerConcurrency:
|
||||||
|
@pytest.fixture(autouse=True)
|
||||||
|
def _ok_preflight(self, monkeypatch):
|
||||||
|
# ORCH-1 resilience: the worker gates claims behind preflight; in tests there
|
||||||
|
# is no claude binary, so stub preflight OK to exercise pure queue/concurrency.
|
||||||
|
monkeypatch.setattr("src.queue_worker.preflight.check", lambda *a, **k: (True, "ok"))
|
||||||
|
|
||||||
|
def test_worker_respects_max_concurrency(self, monkeypatch):
|
||||||
|
from src.queue_worker import QueueWorker
|
||||||
|
|
||||||
|
launched = []
|
||||||
|
|
||||||
|
def fake_launch_job(job):
|
||||||
|
# Simulate a long-running agent: the job stays 'running' (we do NOT
|
||||||
|
# mark it done), so the slot remains occupied.
|
||||||
|
launched.append(job["id"])
|
||||||
|
return 100 + job["id"]
|
||||||
|
|
||||||
|
monkeypatch.setattr("src.queue_worker.launcher.launch_job", fake_launch_job)
|
||||||
|
|
||||||
|
for _ in range(5):
|
||||||
|
enqueue_job("developer", "r")
|
||||||
|
|
||||||
|
w = QueueWorker(max_concurrency=2, poll_interval=0.01)
|
||||||
|
w._drain_once()
|
||||||
|
|
||||||
|
# Only max_concurrency jobs may be launched / running at once.
|
||||||
|
assert len(launched) == 2
|
||||||
|
assert count_running_jobs() == 2
|
||||||
|
|
||||||
|
def test_worker_drains_as_slots_free(self, monkeypatch):
|
||||||
|
from src.queue_worker import QueueWorker
|
||||||
|
|
||||||
|
def fake_launch_job(job):
|
||||||
|
# Immediately complete the job so the slot frees for the next claim.
|
||||||
|
mark_job(job["id"], "done", run_id=job["id"])
|
||||||
|
return job["id"]
|
||||||
|
|
||||||
|
monkeypatch.setattr("src.queue_worker.launcher.launch_job", fake_launch_job)
|
||||||
|
|
||||||
|
for _ in range(4):
|
||||||
|
enqueue_job("analyst", "r")
|
||||||
|
|
||||||
|
w = QueueWorker(max_concurrency=1, poll_interval=0.01)
|
||||||
|
w._drain_once()
|
||||||
|
|
||||||
|
# With instant completion and concurrency 1, one drain pass empties the queue.
|
||||||
|
assert job_status_counts()["done"] == 4
|
||||||
|
assert count_running_jobs() == 0
|
||||||
|
|
||||||
|
def test_worker_launch_failure_does_not_wedge_slot(self, monkeypatch):
|
||||||
|
from src.queue_worker import QueueWorker
|
||||||
|
|
||||||
|
def boom(job):
|
||||||
|
raise RuntimeError("repo missing")
|
||||||
|
|
||||||
|
monkeypatch.setattr("src.queue_worker.launcher.launch_job", boom)
|
||||||
|
monkeypatch.setattr("src.notifications.send_telegram", lambda *a, **k: None)
|
||||||
|
|
||||||
|
enqueue_job("developer", "r", max_attempts=1)
|
||||||
|
w = QueueWorker(max_concurrency=1, poll_interval=0.01)
|
||||||
|
w._drain_once()
|
||||||
|
|
||||||
|
# attempts=1 >= max_attempts=1 -> failed, not stuck running.
|
||||||
|
assert count_running_jobs() == 0
|
||||||
|
counts = job_status_counts()
|
||||||
|
assert counts["failed"] == 1
|
||||||
295
tests/test_resilience.py
Normal file
295
tests/test_resilience.py
Normal file
@@ -0,0 +1,295 @@
|
|||||||
|
"""ORCH-1 resilience tests: preflight, 429-classifier, backoff, circuit breaker.
|
||||||
|
|
||||||
|
No real claude/Popen is ever spawned: preflight subprocess and launcher.launch_job
|
||||||
|
are mocked. DB is a fresh per-test sqlite file.
|
||||||
|
"""
|
||||||
|
import os
|
||||||
|
import tempfile
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
_test_db = os.path.join(tempfile.gettempdir(), "test_orchestrator_resilience.db")
|
||||||
|
os.environ["ORCH_DB_PATH"] = _test_db
|
||||||
|
os.environ["ORCH_REPOS_DIR"] = tempfile.gettempdir()
|
||||||
|
os.environ["ORCH_GITEA_TOKEN"] = "test-token"
|
||||||
|
os.environ["ORCH_PLANE_API_TOKEN"] = "test-token"
|
||||||
|
|
||||||
|
import src.db as db
|
||||||
|
from src.db import (
|
||||||
|
init_db, enqueue_job, claim_next_job, get_job, count_running_jobs,
|
||||||
|
mark_job_transient,
|
||||||
|
)
|
||||||
|
from src import preflight, error_classifier
|
||||||
|
from src.error_classifier import classify_text, parse_retry_after, classify_log_file
|
||||||
|
from src.queue_worker import QueueWorker, CircuitBreaker
|
||||||
|
from src.agents.launcher import AgentLauncher
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture(autouse=True)
|
||||||
|
def fresh_db(tmp_path, monkeypatch):
|
||||||
|
monkeypatch.setattr(db.settings, "db_path", str(tmp_path / "res.db"))
|
||||||
|
init_db()
|
||||||
|
preflight.reset_cache()
|
||||||
|
yield
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# A. Preflight
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
class TestPreflight:
|
||||||
|
def test_fail_when_bin_missing(self, monkeypatch):
|
||||||
|
monkeypatch.setattr(preflight, "_claude_bin", lambda: "/no/such/claude")
|
||||||
|
ok, reason = preflight.check(force=True)
|
||||||
|
assert ok is False
|
||||||
|
assert "not found" in reason.lower()
|
||||||
|
|
||||||
|
def test_ok_when_version_succeeds(self, monkeypatch, tmp_path):
|
||||||
|
fake_bin = tmp_path / "claude"
|
||||||
|
fake_bin.write_text("#!/bin/sh\necho v1\n")
|
||||||
|
monkeypatch.setattr(preflight, "_claude_bin", lambda: str(fake_bin))
|
||||||
|
monkeypatch.setattr(preflight, "_run_version", lambda b: (True, "1.2.3"))
|
||||||
|
ok, reason = preflight.check(force=True)
|
||||||
|
assert ok is True
|
||||||
|
|
||||||
|
def test_cache_does_not_recheck_within_ttl(self, monkeypatch, tmp_path):
|
||||||
|
fake_bin = tmp_path / "claude"
|
||||||
|
fake_bin.write_text("x")
|
||||||
|
monkeypatch.setattr(preflight, "_claude_bin", lambda: str(fake_bin))
|
||||||
|
monkeypatch.setattr(db.settings, "preflight_cache_ttl", 999)
|
||||||
|
|
||||||
|
calls = {"n": 0}
|
||||||
|
|
||||||
|
def counting_version(b):
|
||||||
|
calls["n"] += 1
|
||||||
|
return True, "ok"
|
||||||
|
|
||||||
|
monkeypatch.setattr(preflight, "_run_version", counting_version)
|
||||||
|
preflight.reset_cache()
|
||||||
|
preflight.check() # first -> runs version
|
||||||
|
preflight.check() # cached -> no extra version call
|
||||||
|
preflight.check()
|
||||||
|
assert calls["n"] == 1
|
||||||
|
|
||||||
|
def test_force_bypasses_cache(self, monkeypatch, tmp_path):
|
||||||
|
fake_bin = tmp_path / "claude"
|
||||||
|
fake_bin.write_text("x")
|
||||||
|
monkeypatch.setattr(preflight, "_claude_bin", lambda: str(fake_bin))
|
||||||
|
calls = {"n": 0}
|
||||||
|
monkeypatch.setattr(preflight, "_run_version",
|
||||||
|
lambda b: (calls.__setitem__("n", calls["n"] + 1), (True, "ok"))[1])
|
||||||
|
preflight.reset_cache()
|
||||||
|
preflight.check()
|
||||||
|
preflight.check(force=True)
|
||||||
|
assert calls["n"] == 2
|
||||||
|
|
||||||
|
def test_worker_does_not_claim_when_preflight_fails(self, monkeypatch):
|
||||||
|
# Preflight FAIL -> job stays queued, launch_job never called.
|
||||||
|
monkeypatch.setattr("src.queue_worker.preflight.check",
|
||||||
|
lambda *a, **k: (False, "down"))
|
||||||
|
called = {"launch": False}
|
||||||
|
monkeypatch.setattr("src.queue_worker.launcher.launch_job",
|
||||||
|
lambda job: called.__setitem__("launch", True))
|
||||||
|
jid = enqueue_job("analyst", "r")
|
||||||
|
QueueWorker(max_concurrency=1, poll_interval=0.01)._drain_once()
|
||||||
|
assert called["launch"] is False
|
||||||
|
assert get_job(jid)["status"] == "queued"
|
||||||
|
assert count_running_jobs() == 0
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# B. Error classifier
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
class TestClassifier:
|
||||||
|
@pytest.mark.parametrize("text", [
|
||||||
|
"Error: 429 Too Many Requests",
|
||||||
|
"anthropic rate limit exceeded",
|
||||||
|
"overloaded_error: server is overloaded",
|
||||||
|
"API quota exhausted",
|
||||||
|
"503 Service Unavailable",
|
||||||
|
"connection reset by peer",
|
||||||
|
])
|
||||||
|
def test_transient_patterns(self, text):
|
||||||
|
assert classify_text(text) == "transient"
|
||||||
|
|
||||||
|
@pytest.mark.parametrize("text", [
|
||||||
|
"Traceback: KeyError 'foo'",
|
||||||
|
"SyntaxError: invalid syntax",
|
||||||
|
"assertion failed in test",
|
||||||
|
"",
|
||||||
|
])
|
||||||
|
def test_permanent_patterns(self, text):
|
||||||
|
assert classify_text(text) == "permanent"
|
||||||
|
|
||||||
|
def test_retry_after_header(self):
|
||||||
|
assert parse_retry_after("HTTP/1.1 429\nRetry-After: 42\n") == 42
|
||||||
|
|
||||||
|
def test_retry_after_json(self):
|
||||||
|
assert parse_retry_after('{"error":{"type":"rate_limit","retry_after": 7}}') == 7
|
||||||
|
|
||||||
|
def test_retry_after_absent(self):
|
||||||
|
assert parse_retry_after("just an error") is None
|
||||||
|
|
||||||
|
def test_classify_log_file(self, tmp_path):
|
||||||
|
p = tmp_path / "run.log"
|
||||||
|
p.write_text("...lots of output...\n429 rate limit. Retry-After: 30\n")
|
||||||
|
kind, ra = classify_log_file(str(p))
|
||||||
|
assert kind == "transient"
|
||||||
|
assert ra == 30
|
||||||
|
|
||||||
|
def test_classify_missing_file_is_permanent(self):
|
||||||
|
kind, ra = classify_log_file("/no/such/log")
|
||||||
|
assert kind == "permanent"
|
||||||
|
assert ra is None
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# C. Backoff + available_at gating
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
class TestBackoff:
|
||||||
|
def test_backoff_grows_exponentially(self):
|
||||||
|
lr = AgentLauncher()
|
||||||
|
# base=10, cap=600 (defaults)
|
||||||
|
b1 = lr._backoff_seconds(1)
|
||||||
|
b2 = lr._backoff_seconds(2)
|
||||||
|
b3 = lr._backoff_seconds(3)
|
||||||
|
assert b1 == 20 # 2^1*10
|
||||||
|
assert b2 == 40 # 2^2*10
|
||||||
|
assert b3 == 80 # 2^3*10
|
||||||
|
assert b2 > b1 and b3 > b2
|
||||||
|
|
||||||
|
def test_backoff_capped(self):
|
||||||
|
lr = AgentLauncher()
|
||||||
|
assert lr._backoff_seconds(20) == 600 # capped at backoff_max_seconds
|
||||||
|
|
||||||
|
def test_retry_after_respected_when_larger(self):
|
||||||
|
lr = AgentLauncher()
|
||||||
|
# transient_attempts=1 -> base backoff 20; Retry-After=120 wins.
|
||||||
|
assert lr._backoff_seconds(1, retry_after=120) == 120
|
||||||
|
|
||||||
|
def test_retry_after_ignored_when_smaller(self):
|
||||||
|
lr = AgentLauncher()
|
||||||
|
assert lr._backoff_seconds(3, retry_after=5) == 80 # backoff bigger
|
||||||
|
|
||||||
|
def test_transient_requeue_sets_future_available_at_and_claim_skips(self):
|
||||||
|
jid = enqueue_job("developer", "r")
|
||||||
|
claim_next_job()
|
||||||
|
# Big backoff -> available_at far in the future.
|
||||||
|
mark_job_transient(jid, 3600, error="429")
|
||||||
|
job = get_job(jid)
|
||||||
|
assert job["status"] == "queued"
|
||||||
|
assert job["transient_attempts"] == 1
|
||||||
|
assert job["available_at"] is not None
|
||||||
|
# claim must NOT pick it up while available_at is in the future.
|
||||||
|
assert claim_next_job() is None
|
||||||
|
|
||||||
|
def test_transient_requeue_claimable_when_due(self):
|
||||||
|
jid = enqueue_job("developer", "r")
|
||||||
|
claim_next_job()
|
||||||
|
mark_job_transient(jid, -5, error="429") # available_at in the past
|
||||||
|
c = claim_next_job()
|
||||||
|
assert c is not None and c["id"] == jid
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# D. Launcher transient/permanent finalize (no Popen)
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
class TestFinalizeClassified:
|
||||||
|
def test_transient_failure_backoff_requeue(self, tmp_path, monkeypatch):
|
||||||
|
monkeypatch.setattr("src.notifications.send_telegram", lambda *a, **k: None)
|
||||||
|
log = tmp_path / "1.log"
|
||||||
|
log.write_text("Error 429 rate limit exceeded\n")
|
||||||
|
jid = enqueue_job("developer", "r", max_attempts=2)
|
||||||
|
claim_next_job()
|
||||||
|
AgentLauncher()._finalize_job(jid, "developer", run_id=1, exit_code=1,
|
||||||
|
output_path=str(log))
|
||||||
|
job = get_job(jid)
|
||||||
|
assert job["status"] == "queued"
|
||||||
|
assert job["transient_attempts"] == 1
|
||||||
|
assert job["available_at"] is not None # backoff-gated
|
||||||
|
assert job["attempts"] == 1 # code-fault budget NOT burned
|
||||||
|
|
||||||
|
def test_permanent_failure_uses_normal_attempts(self, tmp_path, monkeypatch):
|
||||||
|
monkeypatch.setattr("src.notifications.send_telegram", lambda *a, **k: None)
|
||||||
|
log = tmp_path / "2.log"
|
||||||
|
log.write_text("Traceback: ValueError\n")
|
||||||
|
jid = enqueue_job("developer", "r", max_attempts=2)
|
||||||
|
claim_next_job()
|
||||||
|
AgentLauncher()._finalize_job(jid, "developer", run_id=2, exit_code=1,
|
||||||
|
output_path=str(log))
|
||||||
|
job = get_job(jid)
|
||||||
|
assert job["status"] == "queued"
|
||||||
|
assert job["transient_attempts"] == 0 # not transient
|
||||||
|
assert job["available_at"] is None # no backoff for code-fault
|
||||||
|
|
||||||
|
def test_transient_exhausts_to_failed(self, tmp_path, monkeypatch):
|
||||||
|
monkeypatch.setattr("src.notifications.send_telegram", lambda *a, **k: None)
|
||||||
|
monkeypatch.setattr(db.settings, "transient_max_attempts", 2)
|
||||||
|
log = tmp_path / "3.log"
|
||||||
|
log.write_text("overloaded_error\n")
|
||||||
|
lr = AgentLauncher()
|
||||||
|
jid = enqueue_job("developer", "r")
|
||||||
|
claim_next_job()
|
||||||
|
lr._finalize_job(jid, "developer", 1, exit_code=1, output_path=str(log))
|
||||||
|
assert get_job(jid)["status"] == "queued" # transient 1 -> requeue
|
||||||
|
# force claimable and retry
|
||||||
|
mark_job_transient(jid, -1) # makes it due; transient=2 now
|
||||||
|
claim_next_job()
|
||||||
|
lr._finalize_job(jid, "developer", 2, exit_code=1, output_path=str(log))
|
||||||
|
assert get_job(jid)["status"] == "failed" # transient budget exhausted
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# E. Circuit breaker
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
class TestCircuitBreaker:
|
||||||
|
def test_opens_after_threshold(self):
|
||||||
|
cb = CircuitBreaker(threshold=3, pause_seconds=300)
|
||||||
|
assert cb.allow_claim() is True
|
||||||
|
cb.record_transient()
|
||||||
|
cb.record_transient()
|
||||||
|
assert cb.state == "closed"
|
||||||
|
cb.record_transient() # 3rd -> open
|
||||||
|
assert cb.state == "open"
|
||||||
|
assert cb.allow_claim() is False # paused, no CLI calls
|
||||||
|
|
||||||
|
def test_recovered_resets_streak(self):
|
||||||
|
cb = CircuitBreaker(threshold=3)
|
||||||
|
cb.record_transient()
|
||||||
|
cb.record_transient()
|
||||||
|
cb.record_recovered()
|
||||||
|
assert cb.consecutive_transient == 0
|
||||||
|
assert cb.state == "closed"
|
||||||
|
|
||||||
|
def test_half_open_after_pause_then_closed_on_success(self, monkeypatch):
|
||||||
|
cb = CircuitBreaker(threshold=2, pause_seconds=300)
|
||||||
|
cb.record_transient()
|
||||||
|
cb.record_transient() # open
|
||||||
|
assert cb.state == "open"
|
||||||
|
# Simulate the pause elapsing.
|
||||||
|
cb.opened_at -= 301
|
||||||
|
assert cb.allow_claim() is True # -> half-open (probe)
|
||||||
|
assert cb.state == "half-open"
|
||||||
|
cb.record_recovered() # probe succeeded
|
||||||
|
assert cb.state == "closed"
|
||||||
|
|
||||||
|
def test_half_open_reopens_on_transient(self):
|
||||||
|
cb = CircuitBreaker(threshold=2, pause_seconds=300)
|
||||||
|
cb.record_transient(); cb.record_transient() # open
|
||||||
|
cb.opened_at -= 301
|
||||||
|
cb.allow_claim() # half-open
|
||||||
|
assert cb.state == "half-open"
|
||||||
|
cb.record_transient() # probe failed -> re-open
|
||||||
|
assert cb.state == "open"
|
||||||
|
|
||||||
|
def test_breaker_blocks_worker_claim(self, monkeypatch):
|
||||||
|
monkeypatch.setattr("src.queue_worker.preflight.check",
|
||||||
|
lambda *a, **k: (True, "ok"))
|
||||||
|
called = {"launch": False}
|
||||||
|
monkeypatch.setattr("src.queue_worker.launcher.launch_job",
|
||||||
|
lambda job: called.__setitem__("launch", True))
|
||||||
|
cb = CircuitBreaker(threshold=1, pause_seconds=300)
|
||||||
|
cb.record_transient() # open immediately
|
||||||
|
w = QueueWorker(max_concurrency=1, poll_interval=0.01, breaker=cb)
|
||||||
|
enqueue_job("analyst", "r")
|
||||||
|
w._drain_once()
|
||||||
|
assert called["launch"] is False # breaker open -> no claim, no CLI
|
||||||
@@ -14,6 +14,12 @@ os.environ["ORCH_GITEA_TOKEN"] = "test-token"
|
|||||||
os.environ["ORCH_PLANE_API_TOKEN"] = "test-token"
|
os.environ["ORCH_PLANE_API_TOKEN"] = "test-token"
|
||||||
os.environ["ORCH_GITEA_OWNER"] = "admin"
|
os.environ["ORCH_GITEA_OWNER"] = "admin"
|
||||||
os.environ["ORCH_DEFAULT_REPO"] = "enduro-trails"
|
os.environ["ORCH_DEFAULT_REPO"] = "enduro-trails"
|
||||||
|
# ORCH-6: register the test project so the project filter lets these fixtures
|
||||||
|
# through. proj-1 maps to enduro-trails/ET, preserving the ET-001/ET-002 asserts.
|
||||||
|
os.environ["ORCH_PROJECTS_JSON"] = (
|
||||||
|
'[{"plane_project_id": "proj-1", "repo": "enduro-trails", '
|
||||||
|
'"work_item_prefix": "ET", "name": "enduro-trails"}]'
|
||||||
|
)
|
||||||
|
|
||||||
from fastapi.testclient import TestClient
|
from fastapi.testclient import TestClient
|
||||||
from src.main import app
|
from src.main import app
|
||||||
|
|||||||
Reference in New Issue
Block a user