Compare commits
22 Commits
feature/OR
...
feature/OR
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
6abdc220d2 | ||
|
|
51401a3ba9 | ||
|
|
0befc49b1e | ||
| fd554c8a5a | |||
|
|
c167c6930d | ||
|
|
49ecb48eb0 | ||
|
|
237732bc64 | ||
| 4e52e192e4 | |||
|
|
c23f000c05 | ||
|
|
d0d47058b4 | ||
|
|
a613fd8180 | ||
|
|
f314ae09e5 | ||
|
|
90fdd19394 | ||
|
|
4ef87a3959 | ||
|
|
0cd9b11fe0 | ||
|
|
4be168c0ec | ||
|
|
2283b8898b | ||
|
|
b6d4426a48 | ||
|
|
20d6556e22 | ||
|
|
3345c2fa0a | ||
|
|
fd3dac7d22 | ||
| b021ff7cb0 |
34
README.md
34
README.md
@@ -39,6 +39,7 @@ created → analysis → architecture → development → review → testing →
|
||||
|--------|------|----------|
|
||||
| GET | `/health` | Health check |
|
||||
| GET | `/status` | Активные задачи (stage != done) |
|
||||
| GET | `/queue` | Очередь задач (ORCH-1): counts по статусам + max_concurrency + последние 10 jobs |
|
||||
| POST | `/webhook/plane` | Plane webhook receiver |
|
||||
| POST | `/webhook/gitea` | Gitea webhook receiver |
|
||||
|
||||
@@ -52,8 +53,9 @@ src/
|
||||
├── stages.py # State machine (transitions, agents, QG)
|
||||
├── notifications.py # Уведомления (логирование)
|
||||
├── plane_sync.py # Синхронизация статусов с Plane API
|
||||
├── queue_worker.py # ORCH-1: фоновый воркер очереди (claim → launch_job)
|
||||
├── agents/
|
||||
│ └── launcher.py # AgentLauncher: launch, monitor, watchdog, auto-advance
|
||||
│ └── launcher.py # AgentLauncher: launch/launch_job, monitor, watchdog, auto-advance
|
||||
├── webhooks/
|
||||
│ ├── plane.py # Plane webhook handler
|
||||
│ └── gitea.py # Gitea webhook handler (push, PR, CI status)
|
||||
@@ -107,6 +109,36 @@ uvicorn src.main:app --reload --port 8500
|
||||
| `ORCH_REPOS_DIR` | Repos dir (container) | `/repos` |
|
||||
| `ORCH_HOST_REPOS_DIR` | Repos dir (host) | `/home/slin/repos` |
|
||||
| `ORCH_DB_PATH` | SQLite path | `/app/data/orchestrator.db` |
|
||||
| `ORCH_MAX_CONCURRENCY` | Сколько jobs воркер запускает параллельно (ORCH-1) | `1` |
|
||||
| `ORCH_QUEUE_POLL_INTERVAL` | Период опроса очереди воркером, сек (ORCH-1) | `2.0` |
|
||||
| `ORCH_PREFLIGHT_CACHE_TTL` | Кэш preflight (CLI/net), сек (ORCH-1 resilience) | `45` |
|
||||
| `ORCH_BACKOFF_BASE_SECONDS` | База exp-backoff для transient (429) | `10` |
|
||||
| `ORCH_BACKOFF_MAX_SECONDS` | Потолок backoff | `600` |
|
||||
| `ORCH_TRANSIENT_MAX_ATTEMPTS` | Ретраи для 429/недоступности | `5` |
|
||||
| `ORCH_BREAKER_THRESHOLD` | transient подряд до открытия breaker | `3` |
|
||||
| `ORCH_BREAKER_PAUSE_SECONDS` | Пауза при открытом breaker | `300` |
|
||||
|
||||
## Очередь задач (ORCH-1 / F-2b)
|
||||
|
||||
Webhook-хэндлеры больше не спавнят claude-агентов синхронно в процессе uvicorn.
|
||||
Вместо этого они кладут **job** в персистентную SQLite-таблицу `jobs`
|
||||
(`enqueue_job`, мгновенный ответ), а фоновый воркер (`src/queue_worker.py`)
|
||||
забирает jobs с учётом `ORCH_MAX_CONCURRENCY` и запускает агента (`launch_job`,
|
||||
та же Popen-логика, что и раньше).
|
||||
|
||||
Преимущества:
|
||||
- **Рестарт-safe.** При старте jobs со статусом `running` возвращаются в `queued`
|
||||
(queue-recovery в lifespan) — работа не теряется.
|
||||
- **Лимит параллелизма.** Воркер не превышает `ORCH_MAX_CONCURRENCY`.
|
||||
- **Ретраи.** Упавший job (exit≠0) ретраится пока `attempts < max_attempts`,
|
||||
потом `failed` + Telegram-нотификация.
|
||||
|
||||
Статусы job: `queued → running → done | failed`. Наблюдаемость — через `GET /queue`.
|
||||
|
||||
**Resilience-слой:** дешёвый preflight (CLI/net, кэш, без токенов) гейтит claim;
|
||||
429/overload детектится по логу (transient vs permanent), transient ретраится с
|
||||
exp-backoff (`available_at`, Retry-After); circuit breaker паузит воркер после N
|
||||
transient подряд. Подробности: `docs/ORCH-1_JOB_QUEUE.md`.
|
||||
|
||||
## Multi-repo: реестр проектов (ORCH-6)
|
||||
|
||||
|
||||
@@ -264,9 +264,71 @@ services:
|
||||
|
||||
- ~~Shared `/repos` checkout (гонки при параллельных задачах).~~ **РЕШЕНО (ORCH-2 / S-4):**
|
||||
git worktree per task/branch — см. раздел «Изоляция через git worktree» ниже.
|
||||
- **In-process daemon-потоки.** Агенты запускаются в daemon-потоках uvicorn. При
|
||||
рестарте uvicorn запущенные агенты осиротевают → ловит orphan-recovery (M-1).
|
||||
Целевая архитектура — очередь задач (F-2b, отдельно).
|
||||
- ~~In-process daemon-потоки (рестарт → сироты, потеря работы).~~ **РЕШЕНО (ORCH-1 / F-2b):**
|
||||
персистентная очередь jobs + фоновый воркер — см. раздел «Очередь задач (ORCH-1)» ниже.
|
||||
Daemon-потоки monitor/watchdog остаются для одного запущенного агента, но при
|
||||
рестарте его job возвращается в `queued` (queue-recovery) и переподхватывается.
|
||||
|
||||
## Очередь задач (ORCH-1 / F-2b)
|
||||
|
||||
Раньше webhook-хэндлер **синхронно** спавнил `subprocess.Popen` + 2 daemon-thread
|
||||
прямо в процессе uvicorn (8 точек вызова). Рестарт = сироты + потеря работы,
|
||||
нет лимита параллелизма, нет ретраев.
|
||||
|
||||
### Flow
|
||||
|
||||
```
|
||||
webhook (plane/gitea) background thread (queue_worker)
|
||||
│ │
|
||||
enqueue_job() ---> [ jobs table ] <--- claim_next_job() (atomic queued->running)
|
||||
(мгновенный status=queued │
|
||||
ответ 200) launch_job(job)
|
||||
│
|
||||
AgentLauncher._spawn (Popen claude)
|
||||
│
|
||||
_monitor_agent (proc.wait, commit/push,
|
||||
│ advance stage)
|
||||
│
|
||||
_finalize_job:
|
||||
exit 0 -> mark_job done
|
||||
exit !=0 & attempts<max -> requeue (queued)
|
||||
exit !=0 & attempts>=max -> failed + Telegram
|
||||
```
|
||||
|
||||
### Таблица `jobs`
|
||||
|
||||
| Колонка | Назначение |
|
||||
|--------|------------|
|
||||
| `status` | `queued` → `running` → `done` \| `failed` |
|
||||
| `attempts` / `max_attempts` | счётчик попыток (инкремент при claim) / лимит ретраев (default 2) |
|
||||
| `run_id` | FK на `agent_runs.id` после старта |
|
||||
| `task_content` | ТЗ, которое пишется в task-файл агента |
|
||||
| `error` | последняя ошибка |
|
||||
|
||||
`idx_jobs_status (status, id)` — быстрый FIFO-выбор queued.
|
||||
|
||||
### Атомарный claim
|
||||
|
||||
`claim_next_job()` делает `SELECT queued ORDER BY id LIMIT 1` → `UPDATE ... WHERE id=? AND
|
||||
status='queued'` и проверяет `rowcount`. При гонке двух тиков лишь один UPDATE
|
||||
переведёт строку в `running` (rowcount==1); проигравший берёт следующий job.
|
||||
|
||||
### Queue-recovery (рестарт-safe)
|
||||
|
||||
В `main.py` lifespan **после** M-1 orphan-recovery вызывается `requeue_running_jobs()`:
|
||||
jobs со статусом `running` (воркер умёр на рестарте) → возвращаются в `queued`.
|
||||
Потом стартует воркер; на shutdown — `worker.stop()` (Event.set + join).
|
||||
|
||||
### Конфиг
|
||||
|
||||
- `ORCH_MAX_CONCURRENCY` (default 1) — лимит параллельных jobs.
|
||||
- `ORCH_QUEUE_POLL_INTERVAL` (default 2.0) — период опроса.
|
||||
|
||||
Наблюдаемость: `GET /queue` — counts по статусам + последние 10 jobs.
|
||||
|
||||
> Совместимость: `launcher.launch()` (прямой синхронный запуск, `job_id=None`)
|
||||
> сохранён для обратной совместимости. Очередь использует `launch_job()`;
|
||||
> оба разделяют `_spawn()` (Popen-логика B-2 не изменена).
|
||||
- **Gitea CI не настроен.** QG развития теперь локальный (`check_tests_local`);
|
||||
Gitea CI-статусы не являются authoritative и не блокируют pipeline.
|
||||
- **Docker внутри контейнера orchestrator НЕДОСТУПЕН.** Деплой идёт только через
|
||||
|
||||
127
docs/ORCH-1_JOB_QUEUE.md
Normal file
127
docs/ORCH-1_JOB_QUEUE.md
Normal file
@@ -0,0 +1,127 @@
|
||||
# ORCH-1 (F-2b): Persistent Job Queue
|
||||
|
||||
**Дата:** 2026-06-02
|
||||
**Ветка:** `feature/ORCH-1-job-queue`
|
||||
**Источник:** AUDIT_2026-06-02 (B-2 / F-2b)
|
||||
|
||||
## Проблема
|
||||
|
||||
Агенты запускались **in-process**: `launcher.launch()` синхронно спавнил
|
||||
`subprocess.Popen` + 2 daemon-thread (`_watchdog`, `_monitor_agent`) прямо в
|
||||
процессе uvicorn, из **8 webhook-точек**. Последствия:
|
||||
|
||||
- **Рестарт = катастрофа.** daemon-threads умирают, claude-процессы → сироты,
|
||||
работа теряется (M-1 лишь помечал `exit=-1` и звал человека).
|
||||
- **Нет лимита параллелизма** — N webhook'ов = N одновременных claude.
|
||||
- **Нет ретраев** — упавший агент просто мёртв.
|
||||
|
||||
## Решение
|
||||
|
||||
Персистентная очередь задач (SQLite-таблица `jobs`) + фоновый воркер:
|
||||
|
||||
1. Webhook-хэндлер кладёт job (`enqueue_job`) → мгновенный ответ 200.
|
||||
2. Фоновый воркер (`src/queue_worker.py`, отдельный daemon-thread) забирает
|
||||
jobs с учётом `max_concurrency` (`claim_next_job`, атомарно) и спавнит агента
|
||||
(`launcher.launch_job`, та же Popen-логика).
|
||||
3. По завершении `_monitor_agent` → `_finalize_job`:
|
||||
- `exit 0` → `done`;
|
||||
- `exit != 0` & `attempts < max_attempts` → requeue (`queued`);
|
||||
- `exit != 0` & `attempts >= max_attempts` → `failed` + Telegram.
|
||||
|
||||
## Что изменено
|
||||
|
||||
| Файл | Изменение |
|
||||
|------|-----------|
|
||||
| `src/db.py` | Таблица `jobs` + индекс; хелперы `enqueue_job`, `claim_next_job` (атомарный), `mark_job`, `count_running_jobs`, `requeue_running_jobs`, `get_job`, `job_status_counts`, `recent_jobs` |
|
||||
| `src/config.py` | `max_concurrency` (env `ORCH_MAX_CONCURRENCY`, default 1), `queue_poll_interval` (env `ORCH_QUEUE_POLL_INTERVAL`, default 2.0) |
|
||||
| `src/agents/launcher.py` | `launch()` → тонкая обёртка над `_spawn()`; новый `launch_job(job)`; `_spawn()` (общий, `job_id` опционально); monitor/watchdog принимают `job_id`; новый `_finalize_job()` (статусы + ретраи). 4 внутренних advance-вызова `self.launch` → `enqueue_job` |
|
||||
| `src/webhooks/plane.py` | 4 точки `launcher.launch` → `enqueue_job` |
|
||||
| `src/webhooks/gitea.py` | 4 точки `launcher.launch` → `enqueue_job` |
|
||||
| `src/queue_worker.py` | **НОВЫЙ** — `QueueWorker` (drain loop + max_concurrency + graceful stop) |
|
||||
| `src/main.py` | lifespan: queue-recovery (`requeue_running_jobs`) после M-1, старт/останов воркера; новый `GET /queue` |
|
||||
| `tests/test_queue.py` | **НОВЫЙ** — 19 тестов (lifecycle, атомарность claim, ретраи, requeue, observability, worker max_concurrency; Popen полностью замокан) |
|
||||
|
||||
## Атомарность claim
|
||||
|
||||
```sql
|
||||
SELECT id FROM jobs WHERE status='queued' ORDER BY id LIMIT 1;
|
||||
UPDATE jobs SET status='running', attempts=attempts+1, started_at=datetime('now')
|
||||
WHERE id=? AND status='queued'; -- rowcount==1 => claimed, ==0 => проиграл гонку
|
||||
```
|
||||
|
||||
Гарантия: один job не выдаётся дважды даже при параллельных тиках воркера
|
||||
(проверено `test_concurrent_claims_no_duplicate` — 8 потоков, 20 jobs).
|
||||
|
||||
## Сохранённые фиксы (НЕ сломаны)
|
||||
|
||||
- **B-1** task-file write (direct `open()` в worktree) — без изменений.
|
||||
- **B-2** Popen → log_fh (no PIPE), monitor reap — без изменений, только обёрнут.
|
||||
- **M-1** orphan-recovery в `main.py` — оставлен, queue-recovery добавлен ПОСЛЕ него.
|
||||
- **ORCH-2** worktree per task — без изменений.
|
||||
- **ORCH-6** project registry/filter — без изменений.
|
||||
|
||||
## Acceptance
|
||||
|
||||
| # | Проверка | Статус |
|
||||
|---|----------|--------|
|
||||
| 1 | webhook кладёт job (queued) | ✅ enqueue_job |
|
||||
| 2 | воркер исполняет queued→running→done | ✅ worker + _finalize_job |
|
||||
| 3 | running ≤ max_concurrency | ✅ test_worker_respects_max_concurrency |
|
||||
| 4 | ретрай fail→queued→failed+notify | ✅ test_finalize_job_requeue_then_fail |
|
||||
| 5 | рестарт-safe (running→requeue) | ✅ requeue_running_jobs + lifespan |
|
||||
| 6 | M-1 не сломан | ✅ оставлен в lifespan |
|
||||
| 7 | тесты (new green, 9 pre-existing) | ✅ 76 passed / 9 pre-existing |
|
||||
| 8 | `/queue` | ✅ counts + recent |
|
||||
|
||||
## Тесты
|
||||
|
||||
```bash
|
||||
IMG=$(docker inspect orchestrator --format '{{.Config.Image}}')
|
||||
docker run --rm -v /home/slin/repos/orchestrator:/code -w /code \
|
||||
--entrypoint python3 $IMG -m pytest tests/ -q
|
||||
# 110 passed, 9 failed (pre-existing test_webhooks 401/signature/TypeError)
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## Resilience-слой (ДОПОЛНЕНИЕ: preflight + 429 + backoff + circuit breaker)
|
||||
|
||||
Надёжность очереди против недоступности CLI и rate-limit. Два РАЗНЫХ класса
|
||||
проблем лечатся по-разному.
|
||||
|
||||
### A. Дешёвый preflight (`src/preflight.py`) — не жжёт токены
|
||||
Перед claim воркер проверяет: `os.path.exists(CLAUDE_BIN)` + `claude --version`
|
||||
(timeout 5с, токены НЕ тратит). Результат кэшируется `preflight_cache_ttl` (45с).
|
||||
FAIL → воркер НЕ claim’ит (job остаётся `queued`), ждёт. 🚫 НЕТ prompt-ping.
|
||||
|
||||
### B. 429 — детект НА ВЫХОДЕ (`src/error_classifier.py`)
|
||||
rate-limit нельзя предсказать — классифицируем по логу прогона. `classify_log_file`
|
||||
читает хвост лога (16KB), ищет `429/rate limit/overloaded/quota/503/529/timeout/...`
|
||||
→ `transient` или `permanent`. Извлекает `Retry-After`.
|
||||
|
||||
- **transient** (429/сеть) → backoff-ретрай с ОТДЕЛЬНЫМ `transient_attempts`
|
||||
(лимит `transient_max_attempts=5`) — не жжёт code-fault бюджет.
|
||||
- **permanent** (code-fault) → обычные `attempts < max_attempts` (2), потом `failed`.
|
||||
|
||||
### C. Backoff + `available_at`
|
||||
Колонки `jobs.available_at TEXT` + `jobs.transient_attempts INTEGER` (миграция
|
||||
`_ensure_column`). `claim_next_job`: `WHERE status='queued' AND (available_at IS NULL
|
||||
OR available_at <= datetime('now'))`. При transient: `available_at = now +
|
||||
min(2^n * base, max)` (base=10с, max=600с), `Retry-After` уважается (берёмся max).
|
||||
|
||||
### D. Circuit breaker (`CircuitBreaker` в queue_worker)
|
||||
N=3 transient подряд → **open**: воркер паузит `breaker_pause_seconds=300`, ВООБЩЕ
|
||||
не дёргает CLI, Telegram-алерт. Через паузу → **half-open** (пробует 1 job);
|
||||
ожил (exit 0) → **closed**; снова transient → опять open. Состояние в памяти
|
||||
воркера, отражается в `/queue.resilience`.
|
||||
Связь launcher→breaker — через callback `launcher.on_outcome` (без import-цикла).
|
||||
|
||||
### Конфиг (config.py)
|
||||
`preflight_cache_ttl=45`, `backoff_base_seconds=10`, `backoff_max_seconds=600`,
|
||||
`transient_max_attempts=5`, `breaker_threshold=3`, `breaker_pause_seconds=300`.
|
||||
|
||||
### Тесты
|
||||
`tests/test_resilience.py` — 34 теста: preflight (FAIL→queued, кэш, force),
|
||||
классификатор (transient/permanent/Retry-After), backoff (рост/cap/Retry-After,
|
||||
`available_at` гейтинг), launcher transient/permanent finalize, breaker
|
||||
(open/half-open/closed/re-open, блок claim).
|
||||
@@ -1,10 +1,12 @@
|
||||
import subprocess
|
||||
import os
|
||||
import json
|
||||
import logging
|
||||
import threading
|
||||
import signal
|
||||
import time
|
||||
from ..config import settings
|
||||
from ..db import get_db, get_task_by_repo_branch, update_task_stage
|
||||
from ..db import get_db, get_task_by_repo_branch, update_task_stage, enqueue_job
|
||||
from ..stages import get_next_stage, get_qg_for_stage, get_agent_for_stage
|
||||
from ..git_worktree import ensure_worktree, get_worktree_path
|
||||
from ..qg.checks import QG_CHECKS
|
||||
@@ -53,11 +55,17 @@ class AgentLauncher:
|
||||
}
|
||||
|
||||
CLAUDE_BIN = "/opt/claude-code/bin/claude.exe"
|
||||
AGENT_TIMEOUT = 1800 # 30 minutes
|
||||
# ORCH-7 (M-2): timeout is now configurable. AGENT_TIMEOUT stays as a
|
||||
# backward-compatible alias for the default; the actual value (and per-agent
|
||||
# overrides) live in settings and are resolved via _resolve_timeout().
|
||||
AGENT_TIMEOUT = settings.agent_timeout_seconds
|
||||
|
||||
def launch(self, agent: str, repo: str, task_content: str = None, task_id: int = None) -> int:
|
||||
"""
|
||||
Launch a Claude CLI agent.
|
||||
Launch a Claude CLI agent directly (legacy synchronous path).
|
||||
|
||||
Kept for backward compatibility (direct callers / existing tests). The
|
||||
ORCH-1 job queue uses launch_job() instead, but both share _spawn().
|
||||
|
||||
Args:
|
||||
agent: Agent role (analyst, architect, developer, reviewer, tester)
|
||||
@@ -68,6 +76,31 @@ class AgentLauncher:
|
||||
Returns:
|
||||
agent_run_id from DB
|
||||
"""
|
||||
return self._spawn(agent, repo, task_content, task_id, job_id=None)
|
||||
|
||||
def launch_job(self, job: dict) -> int:
|
||||
"""ORCH-1: launch an agent for a claimed queue job.
|
||||
|
||||
Same spawn path as launch(), but threads job['id'] through so the monitor
|
||||
can update the job's status (done / requeue / failed) and link jobs.run_id
|
||||
to the agent_runs row. Returns the agent_run_id.
|
||||
"""
|
||||
return self._spawn(
|
||||
job["agent"],
|
||||
job["repo"],
|
||||
job.get("task_content"),
|
||||
job.get("task_id"),
|
||||
job_id=job["id"],
|
||||
)
|
||||
|
||||
def _spawn(self, agent: str, repo: str, task_content: str = None,
|
||||
task_id: int = None, job_id: int = None) -> int:
|
||||
"""Shared spawn implementation for launch() and launch_job().
|
||||
|
||||
When job_id is set, the monitor/watchdog drive the jobs table status
|
||||
(ORCH-1). The claude-CLI Popen logic (B-2) and worktree/task-file logic
|
||||
(B-1 / ORCH-2) are unchanged.
|
||||
"""
|
||||
config = self.AGENT_CONFIGS.get(agent)
|
||||
if not config:
|
||||
raise ValueError(f"Unknown agent: {agent}")
|
||||
@@ -98,6 +131,14 @@ class AgentLauncher:
|
||||
run_id = cursor.lastrowid
|
||||
conn.commit()
|
||||
|
||||
# ORCH-1: link this job to the agent_runs row and stamp started_at.
|
||||
if job_id is not None:
|
||||
conn.execute(
|
||||
"UPDATE jobs SET run_id = ?, started_at = datetime('now') WHERE id = ?",
|
||||
(run_id, job_id),
|
||||
)
|
||||
conn.commit()
|
||||
|
||||
# Prepare output log path
|
||||
output_path = f"/app/data/runs/{run_id}.log"
|
||||
os.makedirs(os.path.dirname(output_path), exist_ok=True)
|
||||
@@ -154,6 +195,7 @@ class AgentLauncher:
|
||||
t = threading.Thread(
|
||||
target=self._watchdog,
|
||||
args=(proc.pid, run_id),
|
||||
kwargs={"job_id": job_id, "agent": agent},
|
||||
daemon=True,
|
||||
)
|
||||
t.start()
|
||||
@@ -163,6 +205,7 @@ class AgentLauncher:
|
||||
m = threading.Thread(
|
||||
target=self._monitor_agent,
|
||||
args=(proc, run_id, agent, repo, agent_branch, output_path, log_fh),
|
||||
kwargs={"job_id": job_id},
|
||||
daemon=True,
|
||||
)
|
||||
m.start()
|
||||
@@ -171,26 +214,102 @@ class AgentLauncher:
|
||||
notify_agent_started(run_id, agent, task_id)
|
||||
return run_id
|
||||
|
||||
def _watchdog(self, pid: int, run_id: int, timeout: int = None):
|
||||
"""Kill agent if it exceeds timeout."""
|
||||
import time
|
||||
@staticmethod
|
||||
def _resolve_timeout(agent: str = None) -> int:
|
||||
"""ORCH-7 (M-2): resolve the wall-clock timeout for an agent.
|
||||
|
||||
Per-agent override from settings.agent_timeout_overrides_json (a JSON object
|
||||
like {"reviewer": 3600}) wins; otherwise the global default
|
||||
settings.agent_timeout_seconds is used. A malformed override JSON is ignored
|
||||
(falls back to the default) and only logged, so a bad env never bricks runs.
|
||||
"""
|
||||
default = settings.agent_timeout_seconds
|
||||
raw = (settings.agent_timeout_overrides_json or "").strip()
|
||||
if agent and raw:
|
||||
try:
|
||||
overrides = json.loads(raw)
|
||||
if isinstance(overrides, dict) and agent in overrides:
|
||||
return int(overrides[agent])
|
||||
except (ValueError, TypeError) as e:
|
||||
logger.warning(f"Invalid agent_timeout_overrides_json, using default: {e}")
|
||||
return default
|
||||
|
||||
def _watchdog(self, pid: int, run_id: int, timeout: int = None,
|
||||
job_id: int = None, agent: str = None):
|
||||
"""Kill agent if it exceeds its timeout.
|
||||
|
||||
ORCH-1: on a timeout-kill the monitor's proc.wait() returns the kill exit
|
||||
code and drives the job retry/fail logic, so the watchdog itself only needs
|
||||
to terminate the process and record the agent_runs exit. job_id is accepted
|
||||
for symmetry.
|
||||
|
||||
ORCH-7 (M-2): graceful shutdown. Instead of an immediate SIGKILL (which cuts
|
||||
claude off mid-write and leaves half-written artifacts), send SIGTERM first,
|
||||
give the process up to settings.agent_kill_grace_seconds to flush and exit on
|
||||
its own, and only SIGKILL if it is still alive after the grace window. If the
|
||||
process exits during the grace window, SIGKILL is NOT sent.
|
||||
ProcessLookupError is tolerated at every step (the process may already be
|
||||
gone). The recorded exit_code stays -9 to match the existing retry/fail
|
||||
contract regardless of which signal actually reaped it.
|
||||
"""
|
||||
if timeout is None:
|
||||
timeout = self.AGENT_TIMEOUT
|
||||
timeout = self._resolve_timeout(agent)
|
||||
time.sleep(timeout)
|
||||
|
||||
# Phase 1: SIGTERM (graceful). If the process is already gone, we're done.
|
||||
try:
|
||||
os.kill(pid, signal.SIGTERM)
|
||||
logger.warning(
|
||||
f"Agent run_id={run_id} exceeded {timeout}s timeout: sent SIGTERM "
|
||||
f"(pid={pid}), grace={settings.agent_kill_grace_seconds}s"
|
||||
)
|
||||
except ProcessLookupError:
|
||||
logger.info(f"Agent run_id={run_id} already exited before SIGTERM")
|
||||
return # nothing to record: the monitor's proc.wait() owns the exit
|
||||
|
||||
# Phase 2: poll for graceful exit within the grace window.
|
||||
grace = settings.agent_kill_grace_seconds
|
||||
poll_interval = 0.5
|
||||
waited = 0.0
|
||||
while waited < grace:
|
||||
time.sleep(poll_interval)
|
||||
waited += poll_interval
|
||||
try:
|
||||
os.kill(pid, 0) # signal 0 = liveness probe, does not kill
|
||||
except ProcessLookupError:
|
||||
logger.info(
|
||||
f"Agent run_id={run_id} exited gracefully after SIGTERM "
|
||||
f"({waited:.1f}s); no SIGKILL needed"
|
||||
)
|
||||
self._record_kill(run_id)
|
||||
return
|
||||
|
||||
# Phase 3: still alive -> hard SIGKILL.
|
||||
try:
|
||||
os.kill(pid, signal.SIGKILL)
|
||||
logger.warning(f"Agent run_id={run_id} killed after {timeout}s timeout")
|
||||
conn = get_db()
|
||||
conn.execute(
|
||||
"UPDATE agent_runs SET finished_at=datetime('now'), exit_code=-9 WHERE id=?",
|
||||
(run_id,),
|
||||
logger.warning(
|
||||
f"Agent run_id={run_id} did not exit within {grace}s grace: sent SIGKILL"
|
||||
)
|
||||
conn.commit()
|
||||
conn.close()
|
||||
except ProcessLookupError:
|
||||
pass # Already finished
|
||||
logger.info(f"Agent run_id={run_id} exited just before SIGKILL")
|
||||
self._record_kill(run_id)
|
||||
|
||||
def _monitor_agent(self, proc, run_id, agent, repo, branch, output_path=None, log_fh=None):
|
||||
@staticmethod
|
||||
def _record_kill(run_id: int):
|
||||
"""Stamp the agent_runs row as timeout-killed (exit_code=-9).
|
||||
|
||||
ORCH-1: -9 is the existing kill-exit contract the monitor/retry logic keys
|
||||
off, so we keep it stable whether the reap came from SIGTERM or SIGKILL.
|
||||
"""
|
||||
conn = get_db()
|
||||
conn.execute(
|
||||
"UPDATE agent_runs SET finished_at=datetime('now'), exit_code=-9 WHERE id=?",
|
||||
(run_id,),
|
||||
)
|
||||
conn.commit()
|
||||
conn.close()
|
||||
|
||||
def _monitor_agent(self, proc, run_id, agent, repo, branch, output_path=None, log_fh=None, job_id=None):
|
||||
"""Wait for agent to finish, commit+push results, update DB.
|
||||
|
||||
B-2 fix: stdout already goes straight to the log file via Popen, so we just
|
||||
@@ -318,8 +437,142 @@ class AgentLauncher:
|
||||
if exit_code == 0:
|
||||
self._try_advance_stage(run_id, agent, repo, branch)
|
||||
|
||||
# ORCH-1: drive the job-queue status for queue-launched jobs only.
|
||||
# (Legacy direct launch() has job_id=None and is unaffected.)
|
||||
if job_id is not None:
|
||||
self._finalize_job(job_id, agent, run_id, exit_code, output_path=output_path)
|
||||
|
||||
def _backoff_seconds(self, transient_attempts: int, retry_after: int = None) -> int:
|
||||
"""Exponential backoff for transient failures, honouring Retry-After.
|
||||
|
||||
backoff = min(2^transient_attempts * base, max). If the server sent a
|
||||
Retry-After, use the larger of the two (never poll sooner than asked).
|
||||
"""
|
||||
base = settings.backoff_base_seconds
|
||||
cap = settings.backoff_max_seconds
|
||||
backoff = min((2 ** max(transient_attempts, 0)) * base, cap)
|
||||
if retry_after is not None and retry_after > 0:
|
||||
backoff = max(backoff, min(retry_after, cap))
|
||||
return int(backoff)
|
||||
|
||||
def _finalize_job(self, job_id: int, agent: str, run_id: int, exit_code, output_path=None):
|
||||
"""ORCH-1: update the jobs row after the agent process finished.
|
||||
|
||||
exit_code == 0 -> done (and resets the breaker streak via on_outcome).
|
||||
exit_code != 0 -> classify the failure from the run log tail (token-free):
|
||||
- TRANSIENT (429/overload/network): backoff-requeue with available_at in
|
||||
the future + a SEPARATE transient_attempts budget
|
||||
(settings.transient_max_attempts), honouring Retry-After. Reported to
|
||||
the breaker so it opens after N consecutive transient failures.
|
||||
- PERMANENT (code fault): ordinary attempts < max_attempts requeue,
|
||||
otherwise 'failed' + Telegram.
|
||||
"""
|
||||
from ..db import get_job, mark_job
|
||||
from ..error_classifier import classify_log_file
|
||||
try:
|
||||
job = get_job(job_id)
|
||||
if not job:
|
||||
return
|
||||
if exit_code == 0:
|
||||
mark_job(job_id, "done", run_id=run_id)
|
||||
logger.info(f"Job {job_id} ({agent}) done (run_id={run_id})")
|
||||
self._record_outcome(transient=False, recovered=True)
|
||||
return
|
||||
|
||||
# Classify the failure from the agent log tail (no token cost).
|
||||
kind, retry_after = "permanent", None
|
||||
log_path = output_path or f"/app/data/runs/{run_id}.log"
|
||||
try:
|
||||
kind, retry_after = classify_log_file(log_path)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
if kind == "transient":
|
||||
self._finalize_transient(job_id, agent, run_id, exit_code, job, retry_after)
|
||||
else:
|
||||
self._finalize_permanent(job_id, agent, run_id, exit_code, job)
|
||||
except Exception as e:
|
||||
logger.error(f"Job {job_id}: _finalize_job error: {e}")
|
||||
|
||||
def _finalize_transient(self, job_id, agent, run_id, exit_code, job, retry_after):
|
||||
"""Transient (429/overload/net) failure -> backoff requeue or fail when budget out."""
|
||||
from ..db import mark_job, mark_job_transient
|
||||
tattempts = job.get("transient_attempts", 0)
|
||||
tmax = settings.transient_max_attempts
|
||||
err = (f"transient (429/overload) agent {agent} exit={exit_code} "
|
||||
f"(run_id={run_id}); retry_after={retry_after}")
|
||||
self._record_outcome(transient=True, recovered=False)
|
||||
if tattempts < tmax:
|
||||
backoff = self._backoff_seconds(tattempts + 1, retry_after)
|
||||
mark_job_transient(job_id, backoff, error=err)
|
||||
logger.warning(
|
||||
f"Job {job_id} ({agent}) TRANSIENT fail (exit={exit_code}), "
|
||||
f"backoff {backoff}s, transient_attempt {tattempts + 1}/{tmax}"
|
||||
)
|
||||
else:
|
||||
mark_job(job_id, "failed", run_id=run_id, error=err)
|
||||
logger.error(
|
||||
f"Job {job_id} ({agent}) failed after {tattempts} transient attempts"
|
||||
)
|
||||
self._notify_failed(job_id, agent, job, run_id,
|
||||
f"transient (rate-limit) after {tattempts} attempts")
|
||||
|
||||
def _finalize_permanent(self, job_id, agent, run_id, exit_code, job):
|
||||
"""Permanent (code-fault) failure -> normal attempts<max requeue, then fail."""
|
||||
from ..db import mark_job
|
||||
attempts = job.get("attempts", 0)
|
||||
max_attempts = job.get("max_attempts", 2)
|
||||
err = f"agent {agent} exit_code={exit_code} (run_id={run_id})"
|
||||
self._record_outcome(transient=False, recovered=False)
|
||||
if attempts < max_attempts:
|
||||
mark_job(job_id, "queued", run_id=run_id, error=err)
|
||||
logger.warning(
|
||||
f"Job {job_id} ({agent}) failed (exit={exit_code}), "
|
||||
f"requeued (attempt {attempts}/{max_attempts})"
|
||||
)
|
||||
else:
|
||||
mark_job(job_id, "failed", run_id=run_id, error=err)
|
||||
logger.error(
|
||||
f"Job {job_id} ({agent}) failed permanently after "
|
||||
f"{attempts} attempts (exit={exit_code})"
|
||||
)
|
||||
self._notify_failed(job_id, agent, job, run_id,
|
||||
f"{attempts} attempts (exit={exit_code})")
|
||||
|
||||
def _notify_failed(self, job_id, agent, job, run_id, why):
|
||||
try:
|
||||
from ..notifications import send_telegram
|
||||
send_telegram(
|
||||
f"\U0001f6a8 Job {job_id} ({agent}, repo {job.get('repo')}) "
|
||||
f"failed: {why}. Logs: /app/data/runs/{run_id}.log"
|
||||
)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
def _record_outcome(self, transient: bool, recovered: bool):
|
||||
"""Forward the run outcome to the circuit breaker (if a worker is wired).
|
||||
|
||||
Decoupled via a settable callback (set by QueueWorker.start) so the launcher
|
||||
does not hard-import the worker (avoids a cycle) and tests can run the
|
||||
launcher standalone.
|
||||
"""
|
||||
cb = getattr(self, "on_outcome", None)
|
||||
if cb:
|
||||
try:
|
||||
cb(transient=transient, recovered=recovered)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
def _try_advance_stage(self, run_id: int, agent: str, repo: str, branch: str):
|
||||
"""After agent finishes successfully, check QG and advance stage if possible."""
|
||||
"""After agent finishes successfully, advance the stage via the unified engine.
|
||||
|
||||
ORCH-4 / M-3: the 174-line body that used to live here moved into
|
||||
src/stage_engine.advance_stage(). This is now a thin wrapper: it looks up
|
||||
the task by (repo, branch) and delegates. `agent` is forwarded as
|
||||
finished_agent so the analyst/reviewer/tester/architect rollback branches
|
||||
still trigger exactly as before. The agent-selection bug (it used to call
|
||||
get_agent_for_stage(next_stage)) is fixed inside the engine.
|
||||
"""
|
||||
try:
|
||||
conn = get_db()
|
||||
task_row = conn.execute(
|
||||
@@ -331,174 +584,15 @@ class AgentLauncher:
|
||||
return
|
||||
|
||||
task_id, current_stage, work_item_id = task_row
|
||||
qg_name = get_qg_for_stage(current_stage)
|
||||
next_stage = get_next_stage(current_stage)
|
||||
|
||||
if not next_stage:
|
||||
return
|
||||
|
||||
# Run QG check if defined
|
||||
if qg_name and qg_name in QG_CHECKS:
|
||||
check_fn = QG_CHECKS[qg_name]
|
||||
if qg_name in ("check_analysis_approved",):
|
||||
# Requires human approval - post request comment if analyst just finished
|
||||
if agent == "analyst" and qg_name == "check_analysis_approved" and work_item_id:
|
||||
files_check = QG_CHECKS.get("check_analysis_complete")
|
||||
if files_check:
|
||||
files_ok, _ = files_check(repo, work_item_id, branch)
|
||||
if files_ok:
|
||||
# Full artifacts ready -> In Review
|
||||
from ..plane_sync import set_issue_in_review
|
||||
set_issue_in_review(work_item_id)
|
||||
plane_add_comment(
|
||||
work_item_id,
|
||||
"\U0001f4cb BRD/\u0422\u0417/AC/TestPlan \u0433\u043e\u0442\u043e\u0432\u044b. "
|
||||
"\u041f\u0440\u043e\u0448\u0443 review \u0438 \u0440\u0435\u0430\u043a\u0446\u0438\u044e :approved: \u0434\u043b\u044f \u043f\u0440\u043e\u0434\u0432\u0438\u0436\u0435\u043d\u0438\u044f \u0432 Architecture."
|
||||
)
|
||||
notify_approve_requested(task_id)
|
||||
logger.info(f"Task {task_id}: analyst finished, requested :approved: in Plane")
|
||||
else:
|
||||
# Check if questions file exists (in the task worktree)
|
||||
import os as _os
|
||||
questions_path = _os.path.join(
|
||||
get_worktree_path(repo, branch),
|
||||
f"docs/work-items/{work_item_id}/01-questions.md"
|
||||
)
|
||||
if _os.path.isfile(questions_path):
|
||||
# Analyst has questions -> Needs Input
|
||||
from ..plane_sync import set_issue_needs_input
|
||||
set_issue_needs_input(work_item_id)
|
||||
with open(questions_path, "r") as qf:
|
||||
questions_text = qf.read()
|
||||
plane_add_comment(
|
||||
work_item_id,
|
||||
f"\u2753 Analyst \u043d\u0443\u0436\u0434\u0430\u0435\u0442\u0441\u044f \u0432 \u0443\u0442\u043e\u0447\u043d\u0435\u043d\u0438\u0438:\n\n{questions_text}"
|
||||
)
|
||||
from ..notifications import send_telegram
|
||||
send_telegram(
|
||||
f"\u2753 {work_item_id}: Analyst \u0437\u0430\u0434\u0430\u0451\u0442 \u0432\u043e\u043f\u0440\u043e\u0441\u044b. \u041e\u0442\u0432\u0435\u0442\u044c \u0432 Plane."
|
||||
)
|
||||
else:
|
||||
# No artifacts and no questions
|
||||
plane_add_comment(
|
||||
work_item_id,
|
||||
"\u26a0\ufe0f Analyst \u0437\u0430\u0432\u0435\u0440\u0448\u0438\u043b\u0441\u044f \u0431\u0435\u0437 \u0430\u0440\u0442\u0435\u0444\u0430\u043a\u0442\u043e\u0432 \u0438 \u0431\u0435\u0437 \u0432\u043e\u043f\u0440\u043e\u0441\u043e\u0432. \u041f\u0440\u043e\u0432\u0435\u0440\u044c\u0442\u0435 \u043b\u043e\u0433."
|
||||
)
|
||||
return
|
||||
elif qg_name in ("check_ci_green", "check_tests_local"):
|
||||
# (repo, branch) signature — already worktree-aware.
|
||||
passed, reason = check_fn(repo, branch)
|
||||
elif qg_name == "check_tests_passed":
|
||||
# Artifact check — pass branch so it reads from the worktree.
|
||||
passed, reason = check_fn(repo, work_item_id or "", branch)
|
||||
else:
|
||||
# Other artifact checks (check_architecture_done, etc.) — worktree-aware.
|
||||
passed, reason = check_fn(repo, work_item_id or "", branch)
|
||||
|
||||
if not passed:
|
||||
logger.info(f"Task {task_id}: QG '{qg_name}' not passed after {agent}: {reason}")
|
||||
# If reviewer says REQUEST_CHANGES, rollback to development
|
||||
if agent == "reviewer" and "REQUEST_CHANGES" in reason:
|
||||
update_task_stage(task_id, "development")
|
||||
notify_stage_change(task_id, current_stage, "development")
|
||||
plane_notify_stage(work_item_id, current_stage, "development")
|
||||
# Count retries
|
||||
conn2 = get_db()
|
||||
retry_count = conn2.execute(
|
||||
"SELECT COUNT(*) FROM agent_runs WHERE task_id=? AND agent='developer'",
|
||||
(task_id,)
|
||||
).fetchone()[0]
|
||||
conn2.close()
|
||||
if retry_count < 3:
|
||||
task_desc = (
|
||||
f"Work item: {work_item_id}\nRepo: {repo}\nBranch: {branch}\n"
|
||||
f"Stage: development\nNote: REQUEST_CHANGES from reviewer "
|
||||
f"(attempt {retry_count+1}/3). Fix findings in "
|
||||
f"docs/work-items/{work_item_id}/12-review.md"
|
||||
)
|
||||
new_run = self.launch("developer", repo, task_desc, task_id=task_id)
|
||||
logger.info(f"Task {task_id}: reviewer REQUEST_CHANGES, relaunched developer (run_id={new_run})")
|
||||
else:
|
||||
from ..notifications import send_telegram
|
||||
send_telegram(f"\u26a0\ufe0f {work_item_id}: Max developer retries (3) reached. Manual intervention needed.")
|
||||
logger.error(f"Task {task_id}: max retries reached")
|
||||
|
||||
# Task 6: Tester FAIL -> rollback to development
|
||||
if agent == "tester" and qg_name == "check_tests_passed" and not passed:
|
||||
update_task_stage(task_id, "development")
|
||||
notify_stage_change(task_id, current_stage, "development")
|
||||
plane_notify_stage(work_item_id, current_stage, "development")
|
||||
from ..plane_sync import set_issue_in_progress
|
||||
set_issue_in_progress(work_item_id)
|
||||
plane_add_comment(
|
||||
work_item_id,
|
||||
f"\u274c \u0422\u0435\u0441\u0442\u044b \u043d\u0435 \u043f\u0440\u043e\u0448\u043b\u0438: {reason}. Developer \u043f\u0435\u0440\u0435\u0437\u0430\u043f\u0443\u0449\u0435\u043d \u0434\u043b\u044f \u0444\u0438\u043a\u0441\u0430."
|
||||
)
|
||||
conn2 = get_db()
|
||||
retry_count = conn2.execute(
|
||||
"SELECT COUNT(*) FROM agent_runs WHERE task_id=? AND agent='developer'",
|
||||
(task_id,)
|
||||
).fetchone()[0]
|
||||
conn2.close()
|
||||
if retry_count < 3:
|
||||
task_desc = (
|
||||
f"Work item: {work_item_id}\nRepo: {repo}\nBranch: {branch}\n"
|
||||
f"Stage: development\nNote: Tests FAILED. "
|
||||
f"Fix failures described in docs/work-items/{work_item_id}/13-test-report.md"
|
||||
)
|
||||
new_run = self.launch("developer", repo, task_desc, task_id=task_id)
|
||||
logger.info(f"Task {task_id}: tester FAIL, relaunched developer (run_id={new_run})")
|
||||
else:
|
||||
from ..notifications import send_telegram
|
||||
from ..plane_sync import set_issue_blocked
|
||||
set_issue_blocked(work_item_id)
|
||||
send_telegram(f"\U0001f6a8 {work_item_id}: Tests still failing after 3 developer retries. Manual intervention needed.")
|
||||
|
||||
# Task 8: Architect conflict -> rollback to analysis
|
||||
if agent == "architect" and qg_name == "check_architecture_done" and not passed:
|
||||
import os as _os
|
||||
conflict_path = _os.path.join(
|
||||
get_worktree_path(repo, branch),
|
||||
f"docs/work-items/{work_item_id}/10-conflict.md"
|
||||
)
|
||||
if _os.path.isfile(conflict_path):
|
||||
update_task_stage(task_id, "analysis")
|
||||
notify_stage_change(task_id, current_stage, "analysis")
|
||||
plane_notify_stage(work_item_id, current_stage, "analysis")
|
||||
from ..plane_sync import set_issue_in_progress
|
||||
set_issue_in_progress(work_item_id)
|
||||
with open(conflict_path, "r") as cf:
|
||||
conflict_text = cf.read()[:500]
|
||||
plane_add_comment(
|
||||
work_item_id,
|
||||
f"\u26a0\ufe0f Architect \u043d\u0430\u0448\u0451\u043b \u043a\u043e\u043d\u0444\u043b\u0438\u043a\u0442 \u0441 \u0422\u0417. \u0412\u043e\u0437\u0432\u0440\u0430\u0442 \u0432 Analysis.\n\n{conflict_text}"
|
||||
)
|
||||
task_desc = (
|
||||
f"Work item: {work_item_id}\nRepo: {repo}\nBranch: {branch}\n"
|
||||
f"Stage: analysis\nNote: Architect conflict. Revise TRZ. "
|
||||
f"See docs/work-items/{work_item_id}/10-conflict.md"
|
||||
)
|
||||
new_run = self.launch("analyst", repo, task_desc, task_id=task_id)
|
||||
logger.info(f"Task {task_id}: architect conflict, relaunched analyst")
|
||||
return
|
||||
|
||||
return
|
||||
elif qg_name:
|
||||
return
|
||||
|
||||
# Advance stage
|
||||
update_task_stage(task_id, next_stage)
|
||||
notify_stage_change(task_id, current_stage, next_stage)
|
||||
plane_notify_stage(work_item_id, current_stage, next_stage)
|
||||
logger.info(f"Task {task_id}: {current_stage} -> {next_stage} (auto-advance after {agent})")
|
||||
|
||||
# Launch next agent if defined
|
||||
next_agent = get_agent_for_stage(next_stage)
|
||||
if next_agent:
|
||||
task_desc = f"Work item: {work_item_id}\nRepo: {repo}\nBranch: {branch}\nStage: {next_stage}"
|
||||
new_run_id = self.launch(next_agent, repo, task_desc, task_id=task_id)
|
||||
logger.info(f"Task {task_id}: launched '{next_agent}' (run_id={new_run_id})")
|
||||
|
||||
from ..stage_engine import advance_stage
|
||||
advance_stage(
|
||||
task_id=task_id,
|
||||
current_stage=current_stage,
|
||||
repo=repo,
|
||||
work_item_id=work_item_id,
|
||||
branch=branch,
|
||||
finished_agent=agent,
|
||||
)
|
||||
except Exception as e:
|
||||
logger.error(f"Auto-advance failed for run_id={run_id}: {e}")
|
||||
|
||||
@@ -534,47 +628,6 @@ class AgentLauncher:
|
||||
logger.error(f"Failed to create PR for {branch}: {e}")
|
||||
return None
|
||||
|
||||
def _auto_merge_pr(self, repo: str, branch: str, task_id: int, work_item_id: str):
|
||||
import httpx
|
||||
owner = settings.gitea_owner
|
||||
headers = {"Authorization": f"token {settings.gitea_token}"}
|
||||
base_url = f"{settings.gitea_url}/api/v1"
|
||||
try:
|
||||
resp = httpx.get(
|
||||
f"{base_url}/repos/{owner}/{repo}/pulls",
|
||||
params={"state": "open", "head": branch},
|
||||
headers=headers, timeout=10
|
||||
)
|
||||
resp.raise_for_status()
|
||||
prs = resp.json()
|
||||
if not prs:
|
||||
pr_number = self._ensure_pr(repo, branch, 0)
|
||||
if not pr_number:
|
||||
return False
|
||||
else:
|
||||
pr_number = prs[0]["number"]
|
||||
resp = httpx.post(
|
||||
f"{base_url}/repos/{owner}/{repo}/pulls/{pr_number}/merge",
|
||||
json={"Do": "merge"},
|
||||
headers=headers, timeout=30
|
||||
)
|
||||
if resp.status_code in (200, 204):
|
||||
logger.info(f"PR #{pr_number} merged for {branch}")
|
||||
update_task_stage(task_id, "done")
|
||||
notify_stage_change(task_id, "deploy", "done")
|
||||
plane_notify_stage(work_item_id, "deploy", "done")
|
||||
from ..notifications import send_telegram
|
||||
send_telegram(f"\u2705 {work_item_id}: PR #{pr_number} merged! deploy -> done. Task complete.")
|
||||
return True
|
||||
else:
|
||||
logger.error(f"Merge failed for PR #{pr_number}: {resp.status_code} {resp.text}")
|
||||
from ..notifications import send_telegram
|
||||
send_telegram(f"\u26a0\ufe0f {work_item_id}: Auto-merge failed (HTTP {resp.status_code}). Manual merge needed.")
|
||||
return False
|
||||
except Exception as e:
|
||||
logger.error(f"Auto-merge failed for {branch}: {e}")
|
||||
return False
|
||||
|
||||
def _write_task_file(self, repo: str, branch: str, task_file: str, content: str):
|
||||
"""Write task file directly into the task's worktree.
|
||||
|
||||
|
||||
@@ -30,6 +30,42 @@ class Settings(BaseSettings):
|
||||
# DB
|
||||
db_path: str = "/app/data/orchestrator.db"
|
||||
|
||||
# ORCH-1 (F-2b): persistent job queue / background worker.
|
||||
# max_concurrency -> max agent jobs running in parallel (env ORCH_MAX_CONCURRENCY)
|
||||
# queue_poll_interval -> worker loop poll seconds (env ORCH_QUEUE_POLL_INTERVAL)
|
||||
max_concurrency: int = 1
|
||||
queue_poll_interval: float = 2.0
|
||||
|
||||
# ORCH-1b (resilience): preflight + 429/rate-limit + backoff + circuit breaker.
|
||||
# preflight_cache_ttl -> cache the cheap CLI/network preflight result (seconds);
|
||||
# the worker does NOT re-run `claude --version` more often
|
||||
# than this (env ORCH_PREFLIGHT_CACHE_TTL).
|
||||
# backoff_base_seconds -> base for exponential transient backoff.
|
||||
# backoff_max_seconds -> ceiling for the transient backoff.
|
||||
# transient_max_attempts -> retry budget for transient (429/overload/network)
|
||||
# failures, separate from code-fault `attempts`.
|
||||
# breaker_threshold -> consecutive transient failures that OPEN the breaker.
|
||||
# breaker_pause_seconds -> how long the breaker stays open before half-open.
|
||||
preflight_cache_ttl: int = 45
|
||||
backoff_base_seconds: int = 10
|
||||
backoff_max_seconds: int = 600
|
||||
transient_max_attempts: int = 5
|
||||
breaker_threshold: int = 3
|
||||
breaker_pause_seconds: int = 300
|
||||
|
||||
# ORCH-7 (M-2): agent timeout + graceful kill.
|
||||
# agent_timeout_seconds -> default per-agent wall-clock budget; the watchdog
|
||||
# kills the run after this (env ORCH_AGENT_TIMEOUT_SECONDS).
|
||||
# agent_kill_grace_seconds-> pause between SIGTERM and SIGKILL so claude can
|
||||
# flush artifacts before the hard kill
|
||||
# (env ORCH_AGENT_KILL_GRACE_SECONDS).
|
||||
# agent_timeout_overrides_json -> optional per-agent override JSON object,
|
||||
# e.g. {"reviewer": 3600, "architect": 2700}
|
||||
# (env ORCH_AGENT_TIMEOUT_OVERRIDES_JSON).
|
||||
agent_timeout_seconds: int = 1800
|
||||
agent_kill_grace_seconds: int = 20
|
||||
agent_timeout_overrides_json: str = ""
|
||||
|
||||
|
||||
# Telegram notifications
|
||||
telegram_bot_token: str = ""
|
||||
|
||||
266
src/db.py
266
src/db.py
@@ -40,10 +40,44 @@ def init_db():
|
||||
exit_code INTEGER,
|
||||
output_path TEXT
|
||||
);
|
||||
-- ORCH-1 (F-2b): persistent job queue. Webhook handlers enqueue a job and
|
||||
-- return immediately; a background worker claims jobs (respecting
|
||||
-- max_concurrency), spawns the claude agent, and updates the status.
|
||||
-- Restart-safe: running jobs are requeued on startup (queue-recovery).
|
||||
CREATE TABLE IF NOT EXISTS jobs (
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
agent TEXT NOT NULL,
|
||||
repo TEXT NOT NULL,
|
||||
task_id INTEGER, -- FK tasks.id (nullable)
|
||||
task_content TEXT, -- written to the agent task_file
|
||||
status TEXT NOT NULL DEFAULT 'queued', -- queued|running|done|failed
|
||||
attempts INTEGER NOT NULL DEFAULT 0,
|
||||
max_attempts INTEGER NOT NULL DEFAULT 2,
|
||||
run_id INTEGER, -- agent_runs.id once started
|
||||
error TEXT, -- last error message
|
||||
transient_attempts INTEGER NOT NULL DEFAULT 0, -- ORCH-1 resilience: 429/transient retries
|
||||
available_at TEXT, -- ORCH-1 resilience: backoff gate (claim when <= now)
|
||||
created_at TEXT DEFAULT (datetime('now')),
|
||||
started_at TEXT,
|
||||
finished_at TEXT
|
||||
);
|
||||
CREATE INDEX IF NOT EXISTS idx_jobs_status ON jobs(status, id);
|
||||
""")
|
||||
# Lightweight migration: add resilience columns to a pre-existing jobs table
|
||||
# (CREATE TABLE IF NOT EXISTS won't add columns to an already-created table).
|
||||
_ensure_column(conn, "jobs", "transient_attempts", "INTEGER NOT NULL DEFAULT 0")
|
||||
_ensure_column(conn, "jobs", "available_at", "TEXT")
|
||||
conn.close()
|
||||
|
||||
|
||||
def _ensure_column(conn, table: str, column: str, decl: str):
|
||||
"""Add a column to `table` if it does not already exist (idempotent migration)."""
|
||||
cols = [r[1] for r in conn.execute(f"PRAGMA table_info({table})").fetchall()]
|
||||
if column not in cols:
|
||||
conn.execute(f"ALTER TABLE {table} ADD COLUMN {column} {decl}")
|
||||
conn.commit()
|
||||
|
||||
|
||||
def get_task_by_plane_id(plane_id: str) -> dict | None:
|
||||
"""Find task by Plane work item ID (checks plane_id and plane_issue_id)."""
|
||||
conn = get_db()
|
||||
@@ -105,3 +139,235 @@ def get_next_work_item_id(repo: str, prefix: str = "ET") -> str:
|
||||
next_num = 1
|
||||
|
||||
return f"{prefix}-{next_num:03d}"
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# ORCH-1 (F-2b): job queue helpers
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def enqueue_job(
|
||||
agent: str,
|
||||
repo: str,
|
||||
task_content: str | None = None,
|
||||
task_id: int | None = None,
|
||||
max_attempts: int = 2,
|
||||
) -> int:
|
||||
"""Enqueue a new job (status='queued'). Returns the new job id.
|
||||
|
||||
This is what webhook handlers call instead of launching an agent in-process:
|
||||
it is a fast DB INSERT that returns immediately. The background worker
|
||||
(queue_worker) picks the job up later.
|
||||
"""
|
||||
conn = get_db()
|
||||
cursor = conn.execute(
|
||||
"INSERT INTO jobs (agent, repo, task_id, task_content, max_attempts) "
|
||||
"VALUES (?, ?, ?, ?, ?)",
|
||||
(agent, repo, task_id, task_content, max_attempts),
|
||||
)
|
||||
job_id = cursor.lastrowid
|
||||
conn.commit()
|
||||
conn.close()
|
||||
return job_id
|
||||
|
||||
|
||||
def claim_next_job() -> dict | None:
|
||||
"""Atomically claim the oldest queued job and mark it 'running'.
|
||||
|
||||
Atomicity: the UPDATE carries the `status='queued'` guard in its WHERE clause
|
||||
and we check `rowcount`. If two worker ticks race for the same row, only the
|
||||
first UPDATE flips it to 'running' (rowcount==1); the loser sees rowcount==0
|
||||
and retries the SELECT. We rely on SQLite's default per-connection transaction
|
||||
so the SELECT+UPDATE pair is consistent. Returns the claimed job dict or None
|
||||
when the queue is empty.
|
||||
"""
|
||||
conn = get_db()
|
||||
try:
|
||||
while True:
|
||||
row = conn.execute(
|
||||
"SELECT id FROM jobs WHERE status='queued' "
|
||||
"AND (available_at IS NULL OR available_at <= datetime('now')) "
|
||||
"ORDER BY id LIMIT 1"
|
||||
).fetchone()
|
||||
if not row:
|
||||
return None
|
||||
job_id = row["id"]
|
||||
cur = conn.execute(
|
||||
"UPDATE jobs SET status='running', "
|
||||
"attempts = attempts + 1, started_at = datetime('now') "
|
||||
"WHERE id = ? AND status='queued'",
|
||||
(job_id,),
|
||||
)
|
||||
conn.commit()
|
||||
if cur.rowcount == 1:
|
||||
claimed = conn.execute(
|
||||
"SELECT * FROM jobs WHERE id = ?", (job_id,)
|
||||
).fetchone()
|
||||
return dict(claimed)
|
||||
# Lost the race for this row; loop and try the next queued job.
|
||||
finally:
|
||||
conn.close()
|
||||
|
||||
|
||||
def mark_job_transient(job_id: int, available_at_sql_offset_seconds: int,
|
||||
error: str | None = None) -> None:
|
||||
"""ORCH-1 resilience: requeue a job after a *transient* failure (429/overload/net).
|
||||
|
||||
Increments `transient_attempts` (separate from the code-fault `attempts`),
|
||||
sets status back to 'queued', and gates re-pickup via `available_at` =
|
||||
now + backoff seconds. started_at/finished_at are cleared.
|
||||
"""
|
||||
conn = get_db()
|
||||
sets = [
|
||||
"status='queued'",
|
||||
"transient_attempts = transient_attempts + 1",
|
||||
"available_at = datetime('now', ?)",
|
||||
"started_at = NULL",
|
||||
"finished_at = NULL",
|
||||
]
|
||||
params: list = [f"+{int(available_at_sql_offset_seconds)} seconds"]
|
||||
if error is not None:
|
||||
sets.append("error = ?")
|
||||
params.append(error)
|
||||
params.append(job_id)
|
||||
conn.execute(f"UPDATE jobs SET {', '.join(sets)} WHERE id = ?", params)
|
||||
conn.commit()
|
||||
conn.close()
|
||||
|
||||
|
||||
def mark_job(
|
||||
job_id: int,
|
||||
status: str,
|
||||
run_id: int | None = None,
|
||||
error: str | None = None,
|
||||
):
|
||||
"""Update a job's status (queued|running|done|failed).
|
||||
|
||||
- run_id (optional): link to the agent_runs row that executed this job.
|
||||
- error (optional): last error message (for failed/retry).
|
||||
- 'done'/'failed' also stamp finished_at.
|
||||
- 'queued' (requeue for retry) clears started_at/finished_at so the next
|
||||
claim treats it as fresh.
|
||||
"""
|
||||
conn = get_db()
|
||||
sets = ["status = ?"]
|
||||
params: list = [status]
|
||||
if run_id is not None:
|
||||
sets.append("run_id = ?")
|
||||
params.append(run_id)
|
||||
if error is not None:
|
||||
sets.append("error = ?")
|
||||
params.append(error)
|
||||
if status in ("done", "failed"):
|
||||
sets.append("finished_at = datetime('now')")
|
||||
elif status == "queued":
|
||||
sets.append("started_at = NULL")
|
||||
sets.append("finished_at = NULL")
|
||||
params.append(job_id)
|
||||
conn.execute(f"UPDATE jobs SET {', '.join(sets)} WHERE id = ?", params)
|
||||
conn.commit()
|
||||
conn.close()
|
||||
|
||||
|
||||
def count_running_jobs() -> int:
|
||||
"""Number of jobs currently in 'running' status (for max_concurrency)."""
|
||||
conn = get_db()
|
||||
n = conn.execute(
|
||||
"SELECT COUNT(*) FROM jobs WHERE status='running'"
|
||||
).fetchone()[0]
|
||||
conn.close()
|
||||
return int(n)
|
||||
|
||||
|
||||
def requeue_running_jobs() -> int:
|
||||
"""Queue-recovery: on startup, any job left 'running' belongs to a worker that
|
||||
died on restart -> put it back to 'queued'. attempts are kept as-is (the next
|
||||
claim does NOT re-increment beyond what is needed; claim_next_job increments on
|
||||
pickup). Returns the number of requeued jobs.
|
||||
"""
|
||||
conn = get_db()
|
||||
cur = conn.execute(
|
||||
"UPDATE jobs SET status='queued', started_at = NULL "
|
||||
"WHERE status='running'"
|
||||
)
|
||||
conn.commit()
|
||||
n = cur.rowcount
|
||||
conn.close()
|
||||
return int(n)
|
||||
|
||||
|
||||
def get_job(job_id: int) -> dict | None:
|
||||
"""Fetch a single job by id."""
|
||||
conn = get_db()
|
||||
row = conn.execute("SELECT * FROM jobs WHERE id = ?", (job_id,)).fetchone()
|
||||
conn.close()
|
||||
return dict(row) if row else None
|
||||
|
||||
|
||||
def job_status_counts() -> dict:
|
||||
"""Return counts grouped by status (for /queue observability)."""
|
||||
conn = get_db()
|
||||
rows = conn.execute(
|
||||
"SELECT status, COUNT(*) AS n FROM jobs GROUP BY status"
|
||||
).fetchall()
|
||||
conn.close()
|
||||
counts = {"queued": 0, "running": 0, "done": 0, "failed": 0}
|
||||
for r in rows:
|
||||
counts[r["status"]] = r["n"]
|
||||
return counts
|
||||
|
||||
|
||||
def recent_jobs(limit: int = 10) -> list[dict]:
|
||||
"""Return the most recent jobs (for /queue observability)."""
|
||||
conn = get_db()
|
||||
rows = conn.execute(
|
||||
"SELECT * FROM jobs ORDER BY id DESC LIMIT ?", (limit,)
|
||||
).fetchall()
|
||||
conn.close()
|
||||
return [dict(r) for r in rows]
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# ORCH-1b (resilience): transient backoff helpers
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def requeue_job_transient(job_id: int, delay_seconds: float, error: str | None = None):
|
||||
"""ORCH-1b: requeue a job after a TRANSIENT (429/overload/network) failure.
|
||||
|
||||
Unlike a code-fault requeue, this:
|
||||
- increments `transient_attempts` (a separate budget from code-fault attempts)
|
||||
- sets `available_at = now + delay_seconds` so claim_next_job won't pick it
|
||||
up until the backoff window elapses
|
||||
- sets status back to 'queued' and clears started_at/finished_at
|
||||
|
||||
delay_seconds is computed by the caller (exp backoff, capped, Retry-After).
|
||||
"""
|
||||
conn = get_db()
|
||||
conn.execute(
|
||||
"UPDATE jobs SET status='queued', "
|
||||
"transient_attempts = transient_attempts + 1, "
|
||||
"available_at = datetime('now', ? || ' seconds'), "
|
||||
"started_at = NULL, finished_at = NULL, "
|
||||
"error = COALESCE(?, error) "
|
||||
"WHERE id = ?",
|
||||
(f"+{int(round(delay_seconds))}", error, job_id),
|
||||
)
|
||||
conn.commit()
|
||||
conn.close()
|
||||
|
||||
|
||||
def compute_backoff(transient_attempts: int, retry_after: float | None = None) -> float:
|
||||
"""ORCH-1b: exponential backoff (seconds) for a transient failure.
|
||||
|
||||
delay = min(2**transient_attempts * base, max). If the server sent a
|
||||
Retry-After hint we honour it as a floor (use the larger of the two so we
|
||||
never poll sooner than the server asked).
|
||||
|
||||
`transient_attempts` is the count AFTER this failure (i.e. how many transient
|
||||
failures have occurred), so the first backoff uses 2**1.
|
||||
"""
|
||||
base = getattr(settings, "backoff_base_seconds", 10)
|
||||
cap = getattr(settings, "backoff_max_seconds", 600)
|
||||
exp = min((2 ** max(transient_attempts, 0)) * base, cap)
|
||||
if retry_after is not None and retry_after > 0:
|
||||
return float(min(max(exp, retry_after), cap))
|
||||
return float(exp)
|
||||
|
||||
87
src/error_classifier.py
Normal file
87
src/error_classifier.py
Normal file
@@ -0,0 +1,87 @@
|
||||
"""ORCH-1 resilience: classify an agent failure as transient vs permanent.
|
||||
|
||||
Rate limits / overload / network blips cannot be reliably predicted in advance,
|
||||
so we classify *after the run* by scanning the agent's combined stdout/stderr log
|
||||
(B-2 sends both to /app/data/runs/<run_id>.log).
|
||||
|
||||
- transient -> 429 / rate limit / overloaded / network / quota-exhausted etc.
|
||||
=> backoff + transient retry (separate counter, larger budget).
|
||||
- permanent -> a genuine code fault / agent error
|
||||
=> normal attempts < max_attempts, then 'failed'.
|
||||
|
||||
Also extracts a Retry-After hint (seconds) when the server provided one.
|
||||
"""
|
||||
import re
|
||||
|
||||
# Case-insensitive substrings/patterns that signal a transient/rate-limit issue.
|
||||
_TRANSIENT_PATTERNS = [
|
||||
r"\b429\b",
|
||||
r"rate[\s_-]*limit",
|
||||
r"rate_limit_error",
|
||||
r"overloaded",
|
||||
r"overloaded_error",
|
||||
r"too many requests",
|
||||
r"quota",
|
||||
r"insufficient[_\s-]*quota",
|
||||
r"retry[\s-]*after",
|
||||
r"service unavailable",
|
||||
r"\b503\b",
|
||||
r"\b529\b",
|
||||
r"timed out",
|
||||
r"timeout",
|
||||
r"connection (reset|refused|error|aborted)",
|
||||
r"temporarily unavailable",
|
||||
r"econnreset",
|
||||
r"etimedout",
|
||||
]
|
||||
|
||||
_TRANSIENT_RE = re.compile("|".join(_TRANSIENT_PATTERNS), re.IGNORECASE)
|
||||
|
||||
# Retry-After: header style ("Retry-After: 30") or JSON ("retry_after": 30) or
|
||||
# "retry after 30 seconds". Returns the integer seconds.
|
||||
_RETRY_AFTER_RE = re.compile(
|
||||
r"retry[\s_-]*after[\"']?\s*[:=]?\s*[\"']?\s*(\d+)",
|
||||
re.IGNORECASE,
|
||||
)
|
||||
|
||||
|
||||
def classify_text(text: str) -> str:
|
||||
"""Return 'transient' or 'permanent' for a chunk of log/stderr text."""
|
||||
if not text:
|
||||
return "permanent"
|
||||
return "transient" if _TRANSIENT_RE.search(text) else "permanent"
|
||||
|
||||
|
||||
def parse_retry_after(text: str) -> int | None:
|
||||
"""Return Retry-After seconds if present in the text, else None."""
|
||||
if not text:
|
||||
return None
|
||||
m = _RETRY_AFTER_RE.search(text)
|
||||
if m:
|
||||
try:
|
||||
return int(m.group(1))
|
||||
except (TypeError, ValueError):
|
||||
return None
|
||||
return None
|
||||
|
||||
|
||||
def classify_log_file(path: str, tail_bytes: int = 16384) -> tuple[str, int | None]:
|
||||
"""Classify the tail of a log file.
|
||||
|
||||
Reads the last `tail_bytes` of the log (rate-limit messages appear near the
|
||||
end) and returns (classification, retry_after_seconds_or_None).
|
||||
On any read error, treats it as 'permanent' (no special backoff).
|
||||
"""
|
||||
if not path:
|
||||
return "permanent", None
|
||||
try:
|
||||
with open(path, "rb") as f:
|
||||
try:
|
||||
f.seek(-tail_bytes, 2)
|
||||
except OSError:
|
||||
f.seek(0)
|
||||
data = f.read()
|
||||
text = data.decode("utf-8", errors="replace")
|
||||
except Exception:
|
||||
return "permanent", None
|
||||
return classify_text(text), parse_retry_after(text)
|
||||
34
src/main.py
34
src/main.py
@@ -51,7 +51,25 @@ async def lifespan(app: FastAPI):
|
||||
except Exception:
|
||||
pass
|
||||
log.warning(f"Recovered {len(orphan_rows)} orphaned agent runs")
|
||||
yield
|
||||
|
||||
# ORCH-1 (F-2b): queue-recovery. Any job left in 'running' status belongs to a
|
||||
# worker that died on the previous restart -> put it back to 'queued' so the
|
||||
# worker re-picks it up (restart-safe, no lost work). Runs AFTER M-1.
|
||||
from .db import requeue_running_jobs
|
||||
requeued = requeue_running_jobs()
|
||||
if requeued:
|
||||
log.warning(f"Queue-recovery: requeued {requeued} running job(s) after restart")
|
||||
|
||||
# Start the background job-queue worker (ORCH-1).
|
||||
from .queue_worker import worker
|
||||
worker.start()
|
||||
|
||||
try:
|
||||
yield
|
||||
finally:
|
||||
# Graceful shutdown of the worker (running agents keep going; their jobs
|
||||
# are requeued on next start via queue-recovery if the process dies).
|
||||
worker.stop()
|
||||
|
||||
|
||||
app = FastAPI(title="Multi-Agent Orchestrator", lifespan=lifespan)
|
||||
@@ -73,3 +91,17 @@ async def status():
|
||||
).fetchall()
|
||||
conn.close()
|
||||
return {"active_tasks": [dict(t) for t in tasks]}
|
||||
|
||||
|
||||
@app.get("/queue")
|
||||
async def queue():
|
||||
"""ORCH-1: job-queue observability — status counts + recent jobs."""
|
||||
from .db import job_status_counts, recent_jobs
|
||||
from .queue_worker import worker
|
||||
return {
|
||||
"counts": job_status_counts(),
|
||||
"max_concurrency": worker.max_concurrency,
|
||||
"poll_interval": worker.poll_interval,
|
||||
"resilience": worker.status(),
|
||||
"recent": recent_jobs(10),
|
||||
}
|
||||
|
||||
106
src/preflight.py
Normal file
106
src/preflight.py
Normal file
@@ -0,0 +1,106 @@
|
||||
"""ORCH-1 resilience: cheap preflight check (CLI / network available?).
|
||||
|
||||
Goal: before the worker claims a job, confirm the claude CLI binary and runtime
|
||||
are reachable WITHOUT spending any tokens. We only do local/cheap checks:
|
||||
|
||||
1. os.path.exists(CLAUDE_BIN) -- instant
|
||||
2. `claude --version` (timeout 5s) -- spawns CLI, does NOT call the API
|
||||
|
||||
The result is cached for `preflight_cache_ttl` seconds so we do not re-run
|
||||
`claude --version` on every worker tick.
|
||||
|
||||
🚫 We deliberately do NOT do a prompt ping (ping->pong) — that would burn the
|
||||
rate limit and add latency. Preflight is local-only.
|
||||
"""
|
||||
import os
|
||||
import time
|
||||
import logging
|
||||
import subprocess
|
||||
|
||||
from .config import settings
|
||||
|
||||
logger = logging.getLogger("orchestrator.preflight")
|
||||
|
||||
_VERSION_TIMEOUT = 5
|
||||
|
||||
|
||||
class _PreflightCache:
|
||||
def __init__(self):
|
||||
self.ts: float = 0.0
|
||||
self.ok: bool = False
|
||||
self.reason: str = "not checked yet"
|
||||
|
||||
|
||||
_cache = _PreflightCache()
|
||||
|
||||
|
||||
def _claude_bin() -> str:
|
||||
"""Resolve the claude binary preflight should check.
|
||||
|
||||
Must match the binary the launcher actually spawns. The launcher hardcodes
|
||||
AgentLauncher.CLAUDE_BIN for the real Popen, so we prefer that; we only fall
|
||||
back to settings.claude_bin / a default if it is somehow unset. (Note: the
|
||||
container's ORCH_CLAUDE_BIN may point elsewhere; preflight follows the path
|
||||
that is genuinely executed, not the unused env override.)
|
||||
"""
|
||||
try:
|
||||
from .agents.launcher import AgentLauncher
|
||||
launcher_bin = getattr(AgentLauncher, "CLAUDE_BIN", None)
|
||||
if launcher_bin and os.path.exists(launcher_bin):
|
||||
return launcher_bin
|
||||
# Launcher path not present -> fall back to configured/default.
|
||||
return launcher_bin or getattr(settings, "claude_bin", None) or "/opt/claude-code/bin/claude.exe"
|
||||
except Exception:
|
||||
return getattr(settings, "claude_bin", None) or "/opt/claude-code/bin/claude.exe"
|
||||
|
||||
|
||||
def _run_version(bin_path: str) -> tuple[bool, str]:
|
||||
"""`claude --version` — proves the CLI runs without touching the API."""
|
||||
try:
|
||||
r = subprocess.run(
|
||||
[bin_path, "--version"],
|
||||
capture_output=True,
|
||||
text=True,
|
||||
timeout=_VERSION_TIMEOUT,
|
||||
)
|
||||
if r.returncode == 0:
|
||||
return True, (r.stdout or r.stderr or "").strip()[:120] or "ok"
|
||||
return False, f"--version exit {r.returncode}: {(r.stderr or r.stdout).strip()[:120]}"
|
||||
except subprocess.TimeoutExpired:
|
||||
return False, f"--version timed out after {_VERSION_TIMEOUT}s"
|
||||
except FileNotFoundError:
|
||||
return False, "claude binary not found (FileNotFoundError)"
|
||||
except Exception as e: # pragma: no cover - defensive
|
||||
return False, f"--version error: {e}"
|
||||
|
||||
|
||||
def _compute() -> tuple[bool, str]:
|
||||
bin_path = _claude_bin()
|
||||
if not os.path.exists(bin_path):
|
||||
return False, f"CLAUDE_BIN not found: {bin_path}"
|
||||
return _run_version(bin_path)
|
||||
|
||||
|
||||
def check(force: bool = False) -> tuple[bool, str]:
|
||||
"""Return (ok, reason). Cached for preflight_cache_ttl seconds.
|
||||
|
||||
force=True bypasses the cache (used by the breaker half-open probe / tests).
|
||||
"""
|
||||
now = time.time()
|
||||
ttl = settings.preflight_cache_ttl
|
||||
if not force and _cache.ts > 0 and (now - _cache.ts) < ttl:
|
||||
return _cache.ok, _cache.reason
|
||||
ok, reason = _compute()
|
||||
_cache.ts = now
|
||||
_cache.ok = ok
|
||||
_cache.reason = reason
|
||||
if not ok:
|
||||
logger.warning(f"Preflight FAIL: {reason}")
|
||||
return ok, reason
|
||||
|
||||
|
||||
def reset_cache() -> None:
|
||||
"""Invalidate the cache (tests / forced recheck)."""
|
||||
_cache.ts = 0.0
|
||||
_cache.ok = False
|
||||
_cache.reason = "reset"
|
||||
246
src/queue_worker.py
Normal file
246
src/queue_worker.py
Normal file
@@ -0,0 +1,246 @@
|
||||
"""ORCH-1 (F-2b): background job-queue worker with resilience layer.
|
||||
|
||||
A single background thread polls the `jobs` table and spawns agents:
|
||||
|
||||
while running:
|
||||
if breaker.open and not cooled_down: sleep; continue # don't touch CLI
|
||||
if not preflight.ok: sleep; continue # CLI/net down -> wait
|
||||
while count_running_jobs() < max_concurrency:
|
||||
job = claim_next_job() # atomic queued -> running (available_at-gated)
|
||||
if not job: break
|
||||
launcher.launch_job(job) # spawns claude (Popen) + monitor thread
|
||||
sleep(poll_interval)
|
||||
|
||||
Resilience (ДОПОЛНЕНИЕ):
|
||||
A. Preflight — cheap local CLI/net check (cached, no tokens) gates claiming.
|
||||
B/C. The launcher classifies failures (transient vs permanent) and applies
|
||||
backoff via available_at; the worker only needs to honour available_at
|
||||
(claim_next_job does) and react to transient outcomes via the breaker.
|
||||
D. Circuit breaker — N consecutive transient failures -> open (pause M minutes,
|
||||
no CLI calls, Telegram alert) -> half-open (probe one job) -> closed.
|
||||
|
||||
Design: plain daemon thread + threading.Event (the launcher already manages its
|
||||
own monitor/watchdog threads + blocking Popen).
|
||||
"""
|
||||
import time
|
||||
import logging
|
||||
import threading
|
||||
|
||||
from .config import settings
|
||||
from .db import claim_next_job, count_running_jobs
|
||||
from .agents.launcher import launcher
|
||||
from . import preflight
|
||||
|
||||
logger = logging.getLogger("orchestrator.queue_worker")
|
||||
|
||||
|
||||
class CircuitBreaker:
|
||||
"""Trips after `threshold` consecutive transient failures.
|
||||
|
||||
States: closed -> (threshold transient) -> open -> (after pause) half-open
|
||||
-> (recovered) closed | (transient again) open.
|
||||
Thread-safe enough for our single-worker + monitor-thread callbacks (a lock
|
||||
guards the counters).
|
||||
"""
|
||||
|
||||
def __init__(self, threshold: int = None, pause_seconds: int = None):
|
||||
self.threshold = threshold if threshold is not None else settings.breaker_threshold
|
||||
self.pause_seconds = (
|
||||
pause_seconds if pause_seconds is not None else settings.breaker_pause_seconds
|
||||
)
|
||||
self._lock = threading.Lock()
|
||||
self.state = "closed" # closed | open | half-open
|
||||
self.consecutive_transient = 0
|
||||
self.opened_at = 0.0
|
||||
self._notify = None # optional callable(message) for alerts
|
||||
|
||||
def set_notifier(self, fn):
|
||||
self._notify = fn
|
||||
|
||||
def record_transient(self):
|
||||
with self._lock:
|
||||
self.consecutive_transient += 1
|
||||
if self.state == "half-open":
|
||||
# Probe failed -> re-open.
|
||||
self._open("circuit re-opened: probe job hit transient again")
|
||||
elif self.consecutive_transient >= self.threshold and self.state == "closed":
|
||||
self._open(
|
||||
f"circuit OPEN: {self.consecutive_transient} consecutive "
|
||||
f"transient failures; pausing {self.pause_seconds}s (no CLI calls)"
|
||||
)
|
||||
|
||||
def record_recovered(self):
|
||||
with self._lock:
|
||||
self.consecutive_transient = 0
|
||||
if self.state in ("half-open", "open"):
|
||||
self.state = "closed"
|
||||
logger.info("Circuit CLOSED: recovered")
|
||||
|
||||
def record_permanent(self):
|
||||
# A clean permanent (code-fault) failure breaks the transient streak.
|
||||
with self._lock:
|
||||
self.consecutive_transient = 0
|
||||
|
||||
def _open(self, msg: str):
|
||||
self.state = "open"
|
||||
self.opened_at = time.time()
|
||||
logger.warning(msg)
|
||||
if self._notify:
|
||||
try:
|
||||
self._notify(f"\U0001f534 {msg}")
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
def allow_claim(self) -> bool:
|
||||
"""Return True if the worker may attempt to claim/launch a job now.
|
||||
|
||||
- closed -> yes.
|
||||
- open -> no until pause elapsed; then transition to half-open (yes, one probe).
|
||||
- half-open -> yes (the single probe).
|
||||
"""
|
||||
with self._lock:
|
||||
if self.state == "closed":
|
||||
return True
|
||||
if self.state == "open":
|
||||
if (time.time() - self.opened_at) >= self.pause_seconds:
|
||||
self.state = "half-open"
|
||||
logger.info("Circuit HALF-OPEN: probing one job")
|
||||
return True
|
||||
return False
|
||||
# half-open: allow the probe.
|
||||
return True
|
||||
|
||||
def snapshot(self) -> dict:
|
||||
with self._lock:
|
||||
remaining = 0
|
||||
if self.state == "open":
|
||||
remaining = max(0, int(self.pause_seconds - (time.time() - self.opened_at)))
|
||||
return {
|
||||
"state": self.state,
|
||||
"consecutive_transient": self.consecutive_transient,
|
||||
"pause_remaining_s": remaining,
|
||||
}
|
||||
|
||||
|
||||
class QueueWorker:
|
||||
"""Background worker that drains the persistent job queue (with resilience)."""
|
||||
|
||||
def __init__(self, max_concurrency: int = None, poll_interval: float = None,
|
||||
breaker: CircuitBreaker = None):
|
||||
self.max_concurrency = (
|
||||
max_concurrency if max_concurrency is not None else settings.max_concurrency
|
||||
)
|
||||
self.poll_interval = (
|
||||
poll_interval if poll_interval is not None else settings.queue_poll_interval
|
||||
)
|
||||
self.breaker = breaker or CircuitBreaker()
|
||||
self.last_preflight_ok = True
|
||||
self.last_preflight_reason = "not checked"
|
||||
self._stop = threading.Event()
|
||||
self._thread: threading.Thread | None = None
|
||||
|
||||
# --- circuit breaker outcome callback wired into the launcher ----------
|
||||
def _on_outcome(self, transient: bool, recovered: bool):
|
||||
if recovered:
|
||||
self.breaker.record_recovered()
|
||||
elif transient:
|
||||
self.breaker.record_transient()
|
||||
else:
|
||||
self.breaker.record_permanent()
|
||||
|
||||
def _drain_once(self):
|
||||
"""Claim and launch jobs until concurrency is full or the queue is empty.
|
||||
|
||||
Gated by the circuit breaker and preflight: if the breaker is open (and
|
||||
not yet cooled down) or preflight fails, we do NOT claim — jobs stay
|
||||
queued and no CLI/tokens are touched.
|
||||
"""
|
||||
if not self.breaker.allow_claim():
|
||||
return
|
||||
ok, reason = preflight.check()
|
||||
self.last_preflight_ok = ok
|
||||
self.last_preflight_reason = reason
|
||||
if not ok:
|
||||
logger.info(f"Preflight not ok ({reason}) -> not claiming jobs this tick")
|
||||
return
|
||||
|
||||
# In half-open we only probe a single job, regardless of max_concurrency.
|
||||
half_open = self.breaker.snapshot()["state"] == "half-open"
|
||||
launched = 0
|
||||
while not self._stop.is_set():
|
||||
if half_open and launched >= 1:
|
||||
return
|
||||
if count_running_jobs() >= self.max_concurrency:
|
||||
return
|
||||
job = claim_next_job()
|
||||
if not job:
|
||||
return
|
||||
launched += 1
|
||||
try:
|
||||
run_id = launcher.launch_job(job)
|
||||
logger.info(
|
||||
f"Worker launched job {job['id']} ({job['agent']}, "
|
||||
f"repo {job['repo']}) -> run_id={run_id}"
|
||||
)
|
||||
except Exception as e:
|
||||
# Launch itself failed (e.g. repo missing): treat as a permanent
|
||||
# launch error so the job does not wedge as 'running' forever.
|
||||
logger.error(f"Worker failed to launch job {job['id']}: {e}")
|
||||
try:
|
||||
from .db import get_job, mark_job
|
||||
|
||||
j = get_job(job["id"])
|
||||
attempts = j.get("attempts", 0) if j else 0
|
||||
max_attempts = j.get("max_attempts", 2) if j else 2
|
||||
if attempts < max_attempts:
|
||||
mark_job(job["id"], "queued", error=f"launch error: {e}")
|
||||
else:
|
||||
mark_job(job["id"], "failed", error=f"launch error: {e}")
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
def _run(self):
|
||||
logger.info(
|
||||
f"Queue worker started (max_concurrency={self.max_concurrency}, "
|
||||
f"poll_interval={self.poll_interval}s, breaker_threshold={self.breaker.threshold})"
|
||||
)
|
||||
while not self._stop.is_set():
|
||||
try:
|
||||
self._drain_once()
|
||||
except Exception as e:
|
||||
logger.error(f"Queue worker loop error: {e}")
|
||||
self._stop.wait(self.poll_interval)
|
||||
logger.info("Queue worker stopped")
|
||||
|
||||
def start(self):
|
||||
if self._thread and self._thread.is_alive():
|
||||
return
|
||||
# Wire breaker alerting + launcher outcome callback.
|
||||
try:
|
||||
from .notifications import send_telegram
|
||||
self.breaker.set_notifier(send_telegram)
|
||||
except Exception:
|
||||
pass
|
||||
launcher.on_outcome = self._on_outcome
|
||||
self._stop.clear()
|
||||
self._thread = threading.Thread(
|
||||
target=self._run, name="queue-worker", daemon=True
|
||||
)
|
||||
self._thread.start()
|
||||
|
||||
def stop(self, timeout: float = 5.0):
|
||||
self._stop.set()
|
||||
if self._thread:
|
||||
self._thread.join(timeout=timeout)
|
||||
|
||||
def status(self) -> dict:
|
||||
"""Resilience snapshot for /queue."""
|
||||
return {
|
||||
"breaker": self.breaker.snapshot(),
|
||||
"preflight_ok": self.last_preflight_ok,
|
||||
"preflight_reason": self.last_preflight_reason,
|
||||
}
|
||||
|
||||
|
||||
# Module-level singleton used by the FastAPI lifespan.
|
||||
worker = QueueWorker()
|
||||
425
src/stage_engine.py
Normal file
425
src/stage_engine.py
Normal file
@@ -0,0 +1,425 @@
|
||||
"""Unified stage engine (ORCH-4 / M-3).
|
||||
|
||||
Single source of truth for "an agent finished / a human approved -> run the
|
||||
stage's quality gate and either advance the pipeline or roll it back".
|
||||
|
||||
Before ORCH-4 this logic was duplicated in two places that had silently
|
||||
diverged:
|
||||
- src/agents/launcher.py::_try_advance_stage (sync, rich business logic:
|
||||
analyst approved-flow, reviewer REQUEST_CHANGES rollback+retry, tester FAIL
|
||||
rollback+retry, architect conflict rollback) — but it picked the next agent
|
||||
with get_agent_for_stage(next_stage), which is WRONG.
|
||||
- src/webhooks/plane.py::_try_advance_stage (async, leaner, but it had the
|
||||
check_review_approved PR-by-branch dispatch and used the CORRECT
|
||||
get_agent_for_stage(current_stage)).
|
||||
|
||||
This module merges both into one sync `advance_stage(...)`. launcher calls it
|
||||
directly; the plane webhook calls it through asyncio.to_thread so there is
|
||||
exactly one implementation.
|
||||
|
||||
Agent-selection bug fix (ORCH-4):
|
||||
stages.py defines `agent` as "the agent to launch when advancing FROM this
|
||||
stage". So when advancing current -> next, the correct agent to launch is
|
||||
get_agent_for_stage(current_stage). launcher's old next_stage lookup skipped a
|
||||
stage (e.g. analysis->architecture launched 'developer' instead of
|
||||
'architect'). plane and gitea already used current_stage; we unify on that.
|
||||
"""
|
||||
|
||||
import logging
|
||||
import os
|
||||
from dataclasses import dataclass, field
|
||||
|
||||
from .db import get_db, update_task_stage, enqueue_job
|
||||
from .stages import get_next_stage, get_qg_for_stage, get_agent_for_stage
|
||||
from .git_worktree import get_worktree_path
|
||||
from .qg.checks import QG_CHECKS
|
||||
from .notifications import (
|
||||
notify_stage_change,
|
||||
notify_qg_failure,
|
||||
notify_approve_requested,
|
||||
send_telegram,
|
||||
)
|
||||
from .plane_sync import (
|
||||
notify_stage_change as plane_notify_stage,
|
||||
notify_qg_failure as plane_notify_qg,
|
||||
add_comment as plane_add_comment,
|
||||
set_issue_in_review,
|
||||
set_issue_needs_input,
|
||||
set_issue_in_progress,
|
||||
set_issue_blocked,
|
||||
)
|
||||
from .config import settings
|
||||
|
||||
logger = logging.getLogger("orchestrator.stage_engine")
|
||||
|
||||
MAX_DEVELOPER_RETRIES = 3
|
||||
|
||||
|
||||
@dataclass
|
||||
class AdvanceResult:
|
||||
"""Outcome of an advance_stage() call (mostly for tests/observability)."""
|
||||
|
||||
advanced: bool = False
|
||||
from_stage: str | None = None
|
||||
to_stage: str | None = None
|
||||
enqueued_agent: str | None = None
|
||||
enqueued_job_id: int | None = None
|
||||
qg_name: str | None = None
|
||||
qg_passed: bool | None = None
|
||||
qg_reason: str | None = None
|
||||
rolled_back_to: str | None = None
|
||||
alerted: bool = False
|
||||
note: str | None = None
|
||||
notes: list = field(default_factory=list)
|
||||
|
||||
|
||||
def _run_qg(qg_name: str, repo: str, work_item_id: str, branch: str):
|
||||
"""Dispatch a quality-gate check to the right signature and run it.
|
||||
|
||||
Signatures (unified from launcher + plane):
|
||||
- check_ci_green / check_tests_local -> (repo, branch)
|
||||
- check_review_approved -> (repo, pr_number) [PR found by branch]
|
||||
- everything else (artifact checks) -> (repo, work_item_id, branch)
|
||||
|
||||
Returns (passed: bool, reason: str).
|
||||
"""
|
||||
check_fn = QG_CHECKS.get(qg_name)
|
||||
if not check_fn:
|
||||
logger.error(f"QG function '{qg_name}' not found in registry")
|
||||
return False, f"Unknown QG: {qg_name}"
|
||||
|
||||
if qg_name in ("check_ci_green", "check_tests_local"):
|
||||
# (repo, branch) — already worktree-aware.
|
||||
return check_fn(repo, branch)
|
||||
|
||||
if qg_name == "check_review_approved":
|
||||
# Special case kept from plane: find the open PR for this branch via
|
||||
# Gitea, then check it; fall back to a file-based review marker.
|
||||
return _check_review_approved_by_branch(check_fn, repo, work_item_id, branch)
|
||||
|
||||
# All other artifact checks: (repo, work_item_id, branch). Pass branch so the
|
||||
# check reads from the task worktree (ORCH-2 / S-4).
|
||||
return check_fn(repo, work_item_id or "", branch)
|
||||
|
||||
|
||||
def _check_review_approved_by_branch(check_fn, repo: str, work_item_id: str, branch: str):
|
||||
"""check_review_approved dispatch preserved from plane._try_advance_stage.
|
||||
|
||||
Finds the open PR whose head ref == branch via the Gitea API and runs
|
||||
check_review_approved(repo, pr_number). If no open PR exists, falls back to a
|
||||
file-based review marker (12-review.md / 09-review.md) like the original.
|
||||
"""
|
||||
import httpx as _httpx
|
||||
|
||||
owner = settings.gitea_owner
|
||||
url = f"{settings.gitea_url}/api/v1/repos/{owner}/{repo}/pulls?state=open&limit=50"
|
||||
headers = {"Authorization": f"token {settings.gitea_token}"}
|
||||
try:
|
||||
resp = _httpx.get(url, headers=headers, timeout=10)
|
||||
prs = resp.json()
|
||||
pr_number = None
|
||||
for pr in prs:
|
||||
if pr.get("head", {}).get("ref") == branch:
|
||||
pr_number = pr["number"]
|
||||
break
|
||||
if pr_number:
|
||||
return check_fn(repo, pr_number)
|
||||
# No open PR but a review file may exist — check file-based.
|
||||
wt = get_worktree_path(repo, branch)
|
||||
if not os.path.isdir(wt):
|
||||
wt = os.path.join(settings.repos_dir, repo)
|
||||
review_path = os.path.join(wt, f"docs/work-items/{work_item_id}/12-review.md")
|
||||
review_path2 = os.path.join(wt, f"docs/work-items/{work_item_id}/09-review.md")
|
||||
if os.path.isfile(review_path) or os.path.isfile(review_path2):
|
||||
return True, "Review file exists (file-based approval)"
|
||||
return False, "No open PR found and no review file"
|
||||
except Exception as e:
|
||||
return False, f"Error finding PR: {e}"
|
||||
|
||||
|
||||
def _developer_retry_count(task_id: int) -> int:
|
||||
"""How many developer runs have already happened for this task."""
|
||||
conn = get_db()
|
||||
n = conn.execute(
|
||||
"SELECT COUNT(*) FROM agent_runs WHERE task_id=? AND agent='developer'",
|
||||
(task_id,),
|
||||
).fetchone()[0]
|
||||
conn.close()
|
||||
return n
|
||||
|
||||
|
||||
def advance_stage(
|
||||
task_id: int,
|
||||
current_stage: str,
|
||||
repo: str,
|
||||
work_item_id: str,
|
||||
branch: str,
|
||||
finished_agent: str | None = None,
|
||||
) -> AdvanceResult:
|
||||
"""Run the current stage's quality gate and advance / roll back the pipeline.
|
||||
|
||||
This is the single merged implementation (ORCH-4 / M-3). It is synchronous;
|
||||
the async plane webhook calls it via asyncio.to_thread.
|
||||
|
||||
Args:
|
||||
task_id: tasks.id
|
||||
current_stage: the stage the task is currently in
|
||||
repo: repository name
|
||||
work_item_id: Plane work item id (may be "" / None)
|
||||
branch: feature branch
|
||||
finished_agent: the agent that just finished (launcher path). Drives the
|
||||
approved/REQUEST_CHANGES/tester/architect branches. In the
|
||||
plane webhook path it is None, so those agent-specific
|
||||
branches simply do not trigger (matches old plane behavior).
|
||||
|
||||
Returns AdvanceResult describing what happened.
|
||||
"""
|
||||
result = AdvanceResult(from_stage=current_stage)
|
||||
agent = finished_agent
|
||||
try:
|
||||
qg_name = get_qg_for_stage(current_stage)
|
||||
next_stage = get_next_stage(current_stage)
|
||||
result.qg_name = qg_name
|
||||
result.to_stage = next_stage
|
||||
|
||||
if not next_stage:
|
||||
logger.info(f"Task {task_id}: already at terminal stage '{current_stage}'")
|
||||
result.note = "terminal"
|
||||
return result
|
||||
|
||||
# --- Quality gate ----------------------------------------------------
|
||||
if qg_name and qg_name in QG_CHECKS:
|
||||
# Human-approval gate: special analyst approved-flow (launcher only).
|
||||
if qg_name == "check_analysis_approved":
|
||||
_handle_analysis_approved_flow(
|
||||
task_id, current_stage, repo, work_item_id, branch, agent, result
|
||||
)
|
||||
return result
|
||||
|
||||
passed, reason = _run_qg(qg_name, repo, work_item_id, branch)
|
||||
result.qg_passed = passed
|
||||
result.qg_reason = reason
|
||||
|
||||
if not passed:
|
||||
logger.info(
|
||||
f"Task {task_id}: QG '{qg_name}' not passed after {agent}: {reason}"
|
||||
)
|
||||
# Behaviour parity:
|
||||
# - webhook path (finished_agent is None): emit the generic
|
||||
# QG-failure notification, exactly like the old plane handler.
|
||||
# - launcher path (finished_agent set): NO generic notification;
|
||||
# the rollback branches below own their own messaging, exactly
|
||||
# like the old launcher handler.
|
||||
if agent is None:
|
||||
notify_qg_failure(task_id, current_stage, qg_name, reason)
|
||||
plane_notify_qg(work_item_id, current_stage, qg_name, reason)
|
||||
|
||||
_handle_qg_failure_rollbacks(
|
||||
task_id, current_stage, repo, work_item_id, branch,
|
||||
agent, qg_name, reason, result,
|
||||
)
|
||||
return result
|
||||
|
||||
elif qg_name:
|
||||
# QG name set but not registered — do not advance (launcher behavior).
|
||||
result.note = f"qg '{qg_name}' not in registry"
|
||||
return result
|
||||
|
||||
# --- Advance ---------------------------------------------------------
|
||||
update_task_stage(task_id, next_stage)
|
||||
notify_stage_change(task_id, current_stage, next_stage)
|
||||
plane_notify_stage(work_item_id, current_stage, next_stage)
|
||||
result.advanced = True
|
||||
logger.info(
|
||||
f"Task {task_id}: {current_stage} -> {next_stage} "
|
||||
f"(auto-advance after {agent})"
|
||||
)
|
||||
|
||||
# --- Launch the next agent (ORCH-4 fix: current_stage, not next) -----
|
||||
next_agent = get_agent_for_stage(current_stage)
|
||||
if next_agent:
|
||||
task_desc = (
|
||||
f"Work item: {work_item_id}\nRepo: {repo}\n"
|
||||
f"Branch: {branch}\nStage: {next_stage}"
|
||||
)
|
||||
new_job_id = enqueue_job(next_agent, repo, task_desc, task_id=task_id)
|
||||
result.enqueued_agent = next_agent
|
||||
result.enqueued_job_id = new_job_id
|
||||
logger.info(
|
||||
f"Task {task_id}: enqueued '{next_agent}' (job_id={new_job_id})"
|
||||
)
|
||||
|
||||
return result
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"advance_stage failed for task_id={task_id}: {e}")
|
||||
result.note = f"error: {e}"
|
||||
return result
|
||||
|
||||
|
||||
def _handle_analysis_approved_flow(
|
||||
task_id, current_stage, repo, work_item_id, branch, agent, result: AdvanceResult
|
||||
):
|
||||
"""Analyst approved-flow (launcher only).
|
||||
|
||||
Only triggers when the analyst just finished (agent == 'analyst') in the
|
||||
launcher path. Decides between: artifacts ready -> In Review + request
|
||||
:approved:; questions file -> Needs Input; otherwise a warning comment.
|
||||
This gate never advances on its own (human approval does that via the plane
|
||||
webhook), matching the original launcher behavior.
|
||||
"""
|
||||
result.qg_name = "check_analysis_approved"
|
||||
result.note = "analysis-approval-gate"
|
||||
if not (agent == "analyst" and work_item_id):
|
||||
return
|
||||
|
||||
files_check = QG_CHECKS.get("check_analysis_complete")
|
||||
if not files_check:
|
||||
return
|
||||
|
||||
files_ok, _ = files_check(repo, work_item_id, branch)
|
||||
if files_ok:
|
||||
# Full artifacts ready -> In Review, ask for :approved:.
|
||||
set_issue_in_review(work_item_id)
|
||||
plane_add_comment(
|
||||
work_item_id,
|
||||
"\U0001f4cb BRD/\u0422\u0417/AC/TestPlan \u0433\u043e\u0442\u043e\u0432\u044b. "
|
||||
"\u041f\u0440\u043e\u0448\u0443 review \u0438 \u0440\u0435\u0430\u043a\u0446\u0438\u044e :approved: "
|
||||
"\u0434\u043b\u044f \u043f\u0440\u043e\u0434\u0432\u0438\u0436\u0435\u043d\u0438\u044f \u0432 Architecture.",
|
||||
)
|
||||
notify_approve_requested(task_id)
|
||||
result.note = "analysis-in-review"
|
||||
logger.info(
|
||||
f"Task {task_id}: analyst finished, requested :approved: in Plane"
|
||||
)
|
||||
return
|
||||
|
||||
questions_path = os.path.join(
|
||||
get_worktree_path(repo, branch),
|
||||
f"docs/work-items/{work_item_id}/01-questions.md",
|
||||
)
|
||||
if os.path.isfile(questions_path):
|
||||
set_issue_needs_input(work_item_id)
|
||||
with open(questions_path, "r") as qf:
|
||||
questions_text = qf.read()
|
||||
plane_add_comment(
|
||||
work_item_id,
|
||||
f"\u2753 Analyst \u043d\u0443\u0436\u0434\u0430\u0435\u0442\u0441\u044f \u0432 \u0443\u0442\u043e\u0447\u043d\u0435\u043d\u0438\u0438:\n\n{questions_text}",
|
||||
)
|
||||
send_telegram(
|
||||
f"\u2753 {work_item_id}: Analyst \u0437\u0430\u0434\u0430\u0451\u0442 \u0432\u043e\u043f\u0440\u043e\u0441\u044b. \u041e\u0442\u0432\u0435\u0442\u044c \u0432 Plane."
|
||||
)
|
||||
result.note = "analysis-needs-input"
|
||||
return
|
||||
|
||||
# No artifacts and no questions.
|
||||
plane_add_comment(
|
||||
work_item_id,
|
||||
"\u26a0\ufe0f Analyst \u0437\u0430\u0432\u0435\u0440\u0448\u0438\u043b\u0441\u044f \u0431\u0435\u0437 \u0430\u0440\u0442\u0435\u0444\u0430\u043a\u0442\u043e\u0432 \u0438 \u0431\u0435\u0437 \u0432\u043e\u043f\u0440\u043e\u0441\u043e\u0432. \u041f\u0440\u043e\u0432\u0435\u0440\u044c\u0442\u0435 \u043b\u043e\u0433.",
|
||||
)
|
||||
result.note = "analysis-empty"
|
||||
|
||||
|
||||
def _handle_qg_failure_rollbacks(
|
||||
task_id, current_stage, repo, work_item_id, branch,
|
||||
agent, qg_name, reason, result: AdvanceResult,
|
||||
):
|
||||
"""All rollback/retry branches from the original launcher, preserved verbatim.
|
||||
|
||||
Only fire on the launcher path (finished_agent is set). The webhook path
|
||||
passes finished_agent=None, so none of these agent-specific branches trigger
|
||||
— that matches the old plane behavior (it just reported the QG failure).
|
||||
"""
|
||||
# Reviewer REQUEST_CHANGES -> rollback to development + retry (max 3).
|
||||
if agent == "reviewer" and "REQUEST_CHANGES" in (reason or ""):
|
||||
update_task_stage(task_id, "development")
|
||||
notify_stage_change(task_id, current_stage, "development")
|
||||
plane_notify_stage(work_item_id, current_stage, "development")
|
||||
result.rolled_back_to = "development"
|
||||
retry_count = _developer_retry_count(task_id)
|
||||
if retry_count < MAX_DEVELOPER_RETRIES:
|
||||
task_desc = (
|
||||
f"Work item: {work_item_id}\nRepo: {repo}\nBranch: {branch}\n"
|
||||
f"Stage: development\nNote: REQUEST_CHANGES from reviewer "
|
||||
f"(attempt {retry_count+1}/3). Fix findings in "
|
||||
f"docs/work-items/{work_item_id}/12-review.md"
|
||||
)
|
||||
new_job = enqueue_job("developer", repo, task_desc, task_id=task_id)
|
||||
result.enqueued_agent = "developer"
|
||||
result.enqueued_job_id = new_job
|
||||
logger.info(
|
||||
f"Task {task_id}: reviewer REQUEST_CHANGES, enqueued developer "
|
||||
f"(job_id={new_job})"
|
||||
)
|
||||
else:
|
||||
send_telegram(
|
||||
f"\u26a0\ufe0f {work_item_id}: Max developer retries (3) reached. "
|
||||
f"Manual intervention needed."
|
||||
)
|
||||
result.alerted = True
|
||||
logger.error(f"Task {task_id}: max retries reached")
|
||||
|
||||
# Tester check_tests_passed FAIL -> rollback to development + retry (max 3).
|
||||
if agent == "tester" and qg_name == "check_tests_passed":
|
||||
update_task_stage(task_id, "development")
|
||||
notify_stage_change(task_id, current_stage, "development")
|
||||
plane_notify_stage(work_item_id, current_stage, "development")
|
||||
result.rolled_back_to = "development"
|
||||
set_issue_in_progress(work_item_id)
|
||||
plane_add_comment(
|
||||
work_item_id,
|
||||
f"\u274c \u0422\u0435\u0441\u0442\u044b \u043d\u0435 \u043f\u0440\u043e\u0448\u043b\u0438: {reason}. "
|
||||
f"Developer \u043f\u0435\u0440\u0435\u0437\u0430\u043f\u0443\u0449\u0435\u043d \u0434\u043b\u044f \u0444\u0438\u043a\u0441\u0430.",
|
||||
)
|
||||
retry_count = _developer_retry_count(task_id)
|
||||
if retry_count < MAX_DEVELOPER_RETRIES:
|
||||
task_desc = (
|
||||
f"Work item: {work_item_id}\nRepo: {repo}\nBranch: {branch}\n"
|
||||
f"Stage: development\nNote: Tests FAILED. "
|
||||
f"Fix failures described in docs/work-items/{work_item_id}/13-test-report.md"
|
||||
)
|
||||
new_job = enqueue_job("developer", repo, task_desc, task_id=task_id)
|
||||
result.enqueued_agent = "developer"
|
||||
result.enqueued_job_id = new_job
|
||||
logger.info(
|
||||
f"Task {task_id}: tester FAIL, enqueued developer (job_id={new_job})"
|
||||
)
|
||||
else:
|
||||
set_issue_blocked(work_item_id)
|
||||
send_telegram(
|
||||
f"\U0001f6a8 {work_item_id}: Tests still failing after 3 developer "
|
||||
f"retries. Manual intervention needed."
|
||||
)
|
||||
result.alerted = True
|
||||
|
||||
# Architect conflict (10-conflict.md exists) -> rollback to analysis.
|
||||
if agent == "architect" and qg_name == "check_architecture_done":
|
||||
conflict_path = os.path.join(
|
||||
get_worktree_path(repo, branch),
|
||||
f"docs/work-items/{work_item_id}/10-conflict.md",
|
||||
)
|
||||
if os.path.isfile(conflict_path):
|
||||
update_task_stage(task_id, "analysis")
|
||||
notify_stage_change(task_id, current_stage, "analysis")
|
||||
plane_notify_stage(work_item_id, current_stage, "analysis")
|
||||
result.rolled_back_to = "analysis"
|
||||
set_issue_in_progress(work_item_id)
|
||||
with open(conflict_path, "r") as cf:
|
||||
conflict_text = cf.read()[:500]
|
||||
plane_add_comment(
|
||||
work_item_id,
|
||||
f"\u26a0\ufe0f Architect \u043d\u0430\u0448\u0451\u043b \u043a\u043e\u043d\u0444\u043b\u0438\u043a\u0442 \u0441 \u0422\u0417. "
|
||||
f"\u0412\u043e\u0437\u0432\u0440\u0430\u0442 \u0432 Analysis.\n\n{conflict_text}",
|
||||
)
|
||||
task_desc = (
|
||||
f"Work item: {work_item_id}\nRepo: {repo}\nBranch: {branch}\n"
|
||||
f"Stage: analysis\nNote: Architect conflict. Revise TRZ. "
|
||||
f"See docs/work-items/{work_item_id}/10-conflict.md"
|
||||
)
|
||||
new_job = enqueue_job("analyst", repo, task_desc, task_id=task_id)
|
||||
result.enqueued_agent = "analyst"
|
||||
result.enqueued_job_id = new_job
|
||||
logger.info(
|
||||
f"Task {task_id}: architect conflict, enqueued analyst "
|
||||
f"(job_id={new_job})"
|
||||
)
|
||||
@@ -10,7 +10,7 @@ import httpx
|
||||
from fastapi import APIRouter, Request, HTTPException
|
||||
|
||||
from ..config import settings
|
||||
from ..db import get_db, get_task_by_repo_branch, update_task_stage
|
||||
from ..db import get_db, get_task_by_repo_branch, update_task_stage, enqueue_job
|
||||
from ..stages import get_next_stage, get_agent_for_stage
|
||||
from ..qg.checks import check_ci_green, check_review_approved
|
||||
from ..notifications import notify_stage_change, notify_qg_failure, notify_error
|
||||
@@ -123,8 +123,8 @@ async def handle_push(payload: dict):
|
||||
if agent:
|
||||
try:
|
||||
task_desc = f"Work item: {work_item_id}\nRepo: {repo_name}\nBranch: {branch}\nStage: {next_stage}"
|
||||
run_id = launcher.launch(agent, repo_name, task_desc, task_id=task_id)
|
||||
logger.info(f"Task {task_id}: push triggered {current_stage} → {next_stage}, launched '{agent}' (run_id={run_id})")
|
||||
job_id = enqueue_job(agent, repo_name, task_desc, task_id=task_id)
|
||||
logger.info(f"Task {task_id}: push triggered {current_stage} → {next_stage}, enqueued '{agent}' (job_id={job_id})")
|
||||
except Exception as e:
|
||||
notify_error(task_id, f"Failed to launch agent '{agent}': {e}")
|
||||
|
||||
@@ -200,8 +200,8 @@ async def handle_ci_status(payload: dict):
|
||||
if agent:
|
||||
try:
|
||||
task_desc = f"Work item: {work_item_id}\nRepo: {repo_name}\nBranch: {branch}\nStage: {next_stage}"
|
||||
run_id = launcher.launch(agent, repo_name, task_desc, task_id=task_id)
|
||||
logger.info(f"Task {task_id}: CI green → {next_stage}, launched '{agent}' (run_id={run_id})")
|
||||
job_id = enqueue_job(agent, repo_name, task_desc, task_id=task_id)
|
||||
logger.info(f"Task {task_id}: CI green → {next_stage}, enqueued '{agent}' (job_id={job_id})")
|
||||
except Exception as e:
|
||||
notify_error(task_id, f"Failed to launch agent '{agent}': {e}")
|
||||
else:
|
||||
@@ -272,8 +272,8 @@ async def handle_pr(payload: dict):
|
||||
if agent:
|
||||
try:
|
||||
task_desc = f"Work item: {work_item_id}\nRepo: {repo_name}\nBranch: {head_branch}\nStage: {next_stage}"
|
||||
run_id = launcher.launch(agent, repo_name, task_desc, task_id=task_id)
|
||||
logger.info(f"Task {task_id}: PR approved → {next_stage}, launched '{agent}' (run_id={run_id})")
|
||||
job_id = enqueue_job(agent, repo_name, task_desc, task_id=task_id)
|
||||
logger.info(f"Task {task_id}: PR approved → {next_stage}, enqueued '{agent}' (job_id={job_id})")
|
||||
except Exception as e:
|
||||
notify_error(task_id, f"Failed to launch agent '{agent}': {e}")
|
||||
else:
|
||||
@@ -297,8 +297,8 @@ async def handle_pr(payload: dict):
|
||||
f"Work item: {work_item_id}\nRepo: {repo_name}\nBranch: {head_branch}\n"
|
||||
f"Stage: development\nNote: Changes requested in review (attempt {retry_count + 1}/{MAX_DEV_RETRIES})"
|
||||
)
|
||||
run_id = launcher.launch("developer", repo_name, task_desc, task_id=task_id)
|
||||
logger.info(f"Task {task_id}: changes requested, relaunching developer (attempt {retry_count + 1})")
|
||||
job_id = enqueue_job("developer", repo_name, task_desc, task_id=task_id)
|
||||
logger.info(f"Task {task_id}: changes requested, enqueued developer (attempt {retry_count + 1}, job_id={job_id})")
|
||||
except Exception as e:
|
||||
notify_error(task_id, f"Failed to relaunch developer: {e}")
|
||||
else:
|
||||
|
||||
@@ -14,6 +14,7 @@ from ..db import (
|
||||
get_task_by_plane_id,
|
||||
get_next_work_item_id,
|
||||
update_task_stage,
|
||||
enqueue_job,
|
||||
)
|
||||
from ..stages import get_next_stage, get_agent_for_stage, get_qg_for_stage, get_previous_stage
|
||||
from ..qg.checks import QG_CHECKS
|
||||
@@ -186,8 +187,8 @@ async def handle_work_item_created(data: dict, project_id: str = ""):
|
||||
if task_row:
|
||||
task_id = task_row[0]
|
||||
task_desc = f"Work item: {work_item_id}\nRepo: {repo}\nBranch: {branch}\nStage: analysis\nTitle: {name}"
|
||||
run_id = launcher.launch("analyst", repo, task_desc, task_id=task_id)
|
||||
logger.info(f"Task {task_id}: launched analyst (run_id={run_id})")
|
||||
job_id = enqueue_job("analyst", repo, task_desc, task_id=task_id)
|
||||
logger.info(f"Task {task_id}: enqueued analyst (job_id={job_id})")
|
||||
# Post start comment to Plane
|
||||
from ..plane_sync import add_comment as _add_comment
|
||||
_add_comment(work_item_id, "\U0001f50d Analyst \u0437\u0430\u043f\u0443\u0449\u0435\u043d. BRD/\u0422\u0417/AC/TestPlan \u0432 \u0440\u0430\u0431\u043e\u0442\u0435 (\u043e\u0436\u0438\u0434\u0430\u0439\u0442\u0435 8-15 \u043c\u0438\u043d).")
|
||||
@@ -231,10 +232,10 @@ async def handle_comment(data: dict, project_id: str = ""):
|
||||
f"Stage: analysis\nNote: Stakeholder REJECTED your artifacts. "
|
||||
f"Reason: {reason}\nRevise and improve."
|
||||
)
|
||||
new_run = launcher.launch("analyst", repo, task_desc, task_id=task_id)
|
||||
new_job = enqueue_job("analyst", repo, task_desc, task_id=task_id)
|
||||
from ..plane_sync import add_comment as _plane_comment
|
||||
_plane_comment(work_item_id, f"\U0001f504 Analyst \u043f\u0435\u0440\u0435\u0437\u0430\u043f\u0443\u0449\u0435\u043d. \u041f\u0440\u0438\u0447\u0438\u043d\u0430 \u043e\u0442\u043a\u043b\u043e\u043d\u0435\u043d\u0438\u044f: {reason}")
|
||||
logger.info(f"Task {task_id}: rejected at analysis, relaunched analyst")
|
||||
logger.info(f"Task {task_id}: rejected at analysis, enqueued analyst (job_id={new_job})")
|
||||
else:
|
||||
# Rollback to previous stage
|
||||
prev_stage = get_previous_stage(current_stage)
|
||||
@@ -305,10 +306,10 @@ async def handle_comment(data: dict, project_id: str = ""):
|
||||
f"Read the latest comment in Plane and revise your artifacts.\n"
|
||||
f"Answer: {comment_body[:500]}"
|
||||
)
|
||||
new_run = launcher.launch("analyst", repo, task_desc, task_id=task_id)
|
||||
new_job = enqueue_job("analyst", repo, task_desc, task_id=task_id)
|
||||
from ..plane_sync import add_comment as _pc2
|
||||
_pc2(work_item_id, "\U0001f504 Analyst \u043f\u0435\u0440\u0435\u0437\u0430\u043f\u0443\u0449\u0435\u043d \u0441 \u043e\u0442\u0432\u0435\u0442\u0430\u043c\u0438 \u0441\u0442\u0435\u0439\u043a\u0445\u043e\u043b\u0434\u0435\u0440\u0430.")
|
||||
logger.info(f"Task {task_id}: stakeholder answered questions, relaunched analyst (run_id={new_run})")
|
||||
logger.info(f"Task {task_id}: stakeholder answered questions, enqueued analyst (job_id={new_job})")
|
||||
return
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to check issue state: {e}")
|
||||
@@ -317,81 +318,30 @@ async def handle_comment(data: dict, project_id: str = ""):
|
||||
async def _try_advance_stage(
|
||||
task_id: int, current_stage: str, repo: str, work_item_id: str, branch: str
|
||||
):
|
||||
"""Run QG check for current stage and advance if passed."""
|
||||
qg_name = get_qg_for_stage(current_stage)
|
||||
next_stage = get_next_stage(current_stage)
|
||||
"""Thin async wrapper over the unified stage engine (ORCH-4 / M-3).
|
||||
|
||||
if not next_stage:
|
||||
logger.info(f"Task {task_id}: already at terminal stage '{current_stage}'")
|
||||
return
|
||||
The QG dispatch (including the check_review_approved PR-by-branch logic) and
|
||||
the advance/launch logic now live in src/stage_engine.advance_stage(), which
|
||||
is synchronous. We run it off the event loop via asyncio.to_thread so there
|
||||
is exactly one implementation shared with the launcher.
|
||||
|
||||
# Run QG check if one is required
|
||||
if qg_name:
|
||||
qg_func = QG_CHECKS.get(qg_name)
|
||||
if not qg_func:
|
||||
logger.error(f"QG function '{qg_name}' not found in registry")
|
||||
return
|
||||
finished_agent is None on this webhook path (a human :approved: comment, not
|
||||
a finished agent), so the agent-specific rollback branches inside the engine
|
||||
intentionally do not trigger — identical to the old plane behavior, which
|
||||
only ran the QG and either advanced or reported the failure.
|
||||
"""
|
||||
import asyncio
|
||||
from ..stage_engine import advance_stage
|
||||
|
||||
# Determine args based on QG function
|
||||
if qg_name in ("check_analysis_approved", "check_analysis_complete", "check_architecture_done", "check_tests_passed", "check_reviewer_verdict"):
|
||||
# ORCH-2 / S-4: pass branch so artifacts are read from the task worktree.
|
||||
passed, reason = qg_func(repo, work_item_id, branch)
|
||||
elif qg_name in ("check_ci_green", "check_tests_local"):
|
||||
passed, reason = qg_func(repo, branch)
|
||||
elif qg_name == "check_review_approved":
|
||||
# Find PR number by branch via Gitea API
|
||||
import httpx as _httpx
|
||||
from ..config import settings as _s
|
||||
_owner = _s.gitea_owner
|
||||
_url = f"{_s.gitea_url}/api/v1/repos/{_owner}/{repo}/pulls?state=open&limit=50"
|
||||
_headers = {"Authorization": f"token {_s.gitea_token}"}
|
||||
try:
|
||||
_resp = _httpx.get(_url, headers=_headers, timeout=10)
|
||||
_prs = _resp.json()
|
||||
_pr_number = None
|
||||
for _pr in _prs:
|
||||
if _pr.get("head", {}).get("ref") == branch:
|
||||
_pr_number = _pr["number"]
|
||||
break
|
||||
if _pr_number:
|
||||
passed, reason = qg_func(repo, _pr_number)
|
||||
else:
|
||||
# No open PR but review file exists — check file-based
|
||||
import os
|
||||
from ..git_worktree import get_worktree_path as _gwp
|
||||
_wt = _gwp(repo, branch) if os.path.isdir(_gwp(repo, branch)) else os.path.join(_s.repos_dir, repo)
|
||||
_review_path = os.path.join(_wt, f"docs/work-items/{work_item_id}/12-review.md")
|
||||
_review_path2 = os.path.join(_wt, f"docs/work-items/{work_item_id}/09-review.md")
|
||||
if os.path.isfile(_review_path) or os.path.isfile(_review_path2):
|
||||
passed, reason = True, "Review file exists (file-based approval)"
|
||||
else:
|
||||
passed, reason = False, "No open PR found and no review file"
|
||||
except Exception as _e:
|
||||
passed, reason = False, f"Error finding PR: {_e}"
|
||||
else:
|
||||
passed, reason = False, f"Unknown QG: {qg_name}"
|
||||
|
||||
if not passed:
|
||||
notify_qg_failure(task_id, current_stage, qg_name, reason)
|
||||
plane_notify_qg(work_item_id, current_stage, qg_name, reason)
|
||||
return
|
||||
|
||||
# Advance stage
|
||||
update_task_stage(task_id, next_stage)
|
||||
notify_stage_change(task_id, current_stage, next_stage)
|
||||
plane_notify_stage(work_item_id, current_stage, next_stage)
|
||||
|
||||
# Launch agent associated with the current stage's transition
|
||||
agent = get_agent_for_stage(current_stage)
|
||||
if agent:
|
||||
try:
|
||||
task_desc = f"Work item: {work_item_id}\nRepo: {repo}\nBranch: {branch}\nStage: {next_stage}"
|
||||
run_id = launcher.launch(agent, repo, task_desc, task_id=task_id)
|
||||
plane_notify_stage(work_item_id, current_stage, next_stage, agent)
|
||||
logger.info(f"Task {task_id}: launched agent '{agent}', run_id={run_id}")
|
||||
except Exception as e:
|
||||
notify_error(task_id, f"Failed to launch agent '{agent}': {e}")
|
||||
logger.error(f"Agent launch failed: {e}")
|
||||
await asyncio.to_thread(
|
||||
advance_stage,
|
||||
task_id,
|
||||
current_stage,
|
||||
repo,
|
||||
work_item_id,
|
||||
branch,
|
||||
None,
|
||||
)
|
||||
|
||||
|
||||
async def _create_gitea_branch(repo: str, branch: str):
|
||||
|
||||
@@ -7,6 +7,7 @@ Covers the audit-2026-06-02 fixes:
|
||||
the YAML frontmatter only (no fragile substring matching).
|
||||
"""
|
||||
import os
|
||||
import signal
|
||||
import tempfile
|
||||
|
||||
import pytest
|
||||
@@ -20,6 +21,7 @@ os.environ["ORCH_PLANE_API_TOKEN"] = "test-token"
|
||||
|
||||
from src.agents.launcher import AgentLauncher
|
||||
from src.qg.checks import check_reviewer_verdict
|
||||
from src.config import settings
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
@@ -138,3 +140,141 @@ class TestCheckReviewerVerdict:
|
||||
passed, reason = check_reviewer_verdict("enduro-trails", "ET-999")
|
||||
assert passed is False
|
||||
assert "not found" in reason.lower()
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# ORCH-7 (M-4): dead code removed
|
||||
# ---------------------------------------------------------------------------
|
||||
class TestDeadCodeRemoved:
|
||||
"""M-4: _auto_merge_pr was never called (merge is the deployer's job) and is
|
||||
removed. _ensure_pr (used by the auto-advance path) must stay."""
|
||||
|
||||
def test_auto_merge_pr_is_gone(self):
|
||||
assert not hasattr(AgentLauncher, "_auto_merge_pr")
|
||||
|
||||
def test_ensure_pr_still_present(self):
|
||||
assert hasattr(AgentLauncher, "_ensure_pr")
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# ORCH-7 (M-2): configurable timeout + per-agent override
|
||||
# ---------------------------------------------------------------------------
|
||||
class TestResolveTimeout:
|
||||
"""M-2: _resolve_timeout honours a per-agent JSON override, else the default."""
|
||||
|
||||
def test_default_when_no_override(self, monkeypatch):
|
||||
monkeypatch.setattr(settings, "agent_timeout_seconds", 1800)
|
||||
monkeypatch.setattr(settings, "agent_timeout_overrides_json", "")
|
||||
assert AgentLauncher._resolve_timeout("developer") == 1800
|
||||
assert AgentLauncher._resolve_timeout(None) == 1800
|
||||
|
||||
def test_override_for_specific_agent(self, monkeypatch):
|
||||
monkeypatch.setattr(settings, "agent_timeout_seconds", 1800)
|
||||
monkeypatch.setattr(
|
||||
settings, "agent_timeout_overrides_json", '{"reviewer": 3600, "architect": 2700}'
|
||||
)
|
||||
assert AgentLauncher._resolve_timeout("reviewer") == 3600
|
||||
assert AgentLauncher._resolve_timeout("architect") == 2700
|
||||
# an agent not in the override map falls back to the default
|
||||
assert AgentLauncher._resolve_timeout("developer") == 1800
|
||||
|
||||
def test_malformed_override_falls_back_to_default(self, monkeypatch):
|
||||
monkeypatch.setattr(settings, "agent_timeout_seconds", 1800)
|
||||
monkeypatch.setattr(settings, "agent_timeout_overrides_json", "{not-json")
|
||||
# must not raise, must return the default
|
||||
assert AgentLauncher._resolve_timeout("reviewer") == 1800
|
||||
|
||||
|
||||
class TestWatchdogGracefulKill:
|
||||
"""M-2: SIGTERM -> grace -> SIGKILL ordering, with graceful-exit short-circuit
|
||||
and ProcessLookupError tolerance. The OS process is fully faked: we record the
|
||||
signals sent and decide liveness from a script, so no real process is touched."""
|
||||
|
||||
def _patch_db(self, monkeypatch):
|
||||
"""Stub get_db so _record_kill does not need a real DB."""
|
||||
class _Conn:
|
||||
def execute(self, *a, **k):
|
||||
return self
|
||||
def commit(self):
|
||||
pass
|
||||
def close(self):
|
||||
pass
|
||||
monkeypatch.setattr("src.agents.launcher.get_db", lambda: _Conn())
|
||||
|
||||
def test_sigterm_then_sigkill_after_grace(self, monkeypatch):
|
||||
"""Process stays alive through the whole grace window -> SIGTERM then SIGKILL."""
|
||||
self._patch_db(monkeypatch)
|
||||
monkeypatch.setattr(settings, "agent_kill_grace_seconds", 1)
|
||||
monkeypatch.setattr("src.agents.launcher.time.sleep", lambda s: None)
|
||||
|
||||
sent = []
|
||||
|
||||
def fake_kill(pid, sig):
|
||||
sent.append(sig)
|
||||
# signal 0 (liveness probe) -> always alive; never raise
|
||||
return None
|
||||
|
||||
monkeypatch.setattr("src.agents.launcher.os.kill", fake_kill)
|
||||
|
||||
launcher = AgentLauncher()
|
||||
launcher._watchdog(pid=4242, run_id=1, timeout=0, agent="developer")
|
||||
|
||||
assert signal.SIGTERM in sent
|
||||
assert signal.SIGKILL in sent
|
||||
# SIGTERM must come before SIGKILL
|
||||
assert sent.index(signal.SIGTERM) < sent.index(signal.SIGKILL)
|
||||
|
||||
def test_graceful_exit_in_grace_skips_sigkill(self, monkeypatch):
|
||||
"""Process dies during the grace window -> SIGKILL is NOT sent."""
|
||||
self._patch_db(monkeypatch)
|
||||
monkeypatch.setattr(settings, "agent_kill_grace_seconds", 5)
|
||||
monkeypatch.setattr("src.agents.launcher.time.sleep", lambda s: None)
|
||||
|
||||
sent = []
|
||||
state = {"alive": True, "probes": 0}
|
||||
|
||||
def fake_kill(pid, sig):
|
||||
if sig == 0:
|
||||
state["probes"] += 1
|
||||
# die on the 2nd liveness probe (within grace)
|
||||
if state["probes"] >= 2:
|
||||
raise ProcessLookupError
|
||||
return None
|
||||
sent.append(sig)
|
||||
return None
|
||||
|
||||
monkeypatch.setattr("src.agents.launcher.os.kill", fake_kill)
|
||||
|
||||
launcher = AgentLauncher()
|
||||
launcher._watchdog(pid=4242, run_id=2, timeout=0, agent="developer")
|
||||
|
||||
assert signal.SIGTERM in sent
|
||||
assert signal.SIGKILL not in sent
|
||||
|
||||
def test_already_dead_before_sigterm(self, monkeypatch):
|
||||
"""Process already gone at SIGTERM -> ProcessLookupError tolerated, no SIGKILL,
|
||||
and _record_kill is NOT called (the monitor's proc.wait owns the exit)."""
|
||||
self._patch_db(monkeypatch)
|
||||
monkeypatch.setattr("src.agents.launcher.time.sleep", lambda s: None)
|
||||
|
||||
sent = []
|
||||
|
||||
def fake_kill(pid, sig):
|
||||
if sig == signal.SIGTERM:
|
||||
raise ProcessLookupError
|
||||
sent.append(sig)
|
||||
return None
|
||||
|
||||
recorded = {"called": False}
|
||||
monkeypatch.setattr(
|
||||
AgentLauncher, "_record_kill",
|
||||
staticmethod(lambda rid: recorded.__setitem__("called", True)),
|
||||
)
|
||||
monkeypatch.setattr("src.agents.launcher.os.kill", fake_kill)
|
||||
|
||||
launcher = AgentLauncher()
|
||||
# must not raise
|
||||
launcher._watchdog(pid=4242, run_id=3, timeout=0, agent="developer")
|
||||
|
||||
assert signal.SIGKILL not in sent
|
||||
assert recorded["called"] is False
|
||||
|
||||
304
tests/test_queue.py
Normal file
304
tests/test_queue.py
Normal file
@@ -0,0 +1,304 @@
|
||||
"""Tests for ORCH-1 (F-2b) persistent job queue.
|
||||
|
||||
Covers:
|
||||
- enqueue_job -> claim_next_job -> mark_job lifecycle
|
||||
- claim_next_job atomicity (no double-dispatch of the same job)
|
||||
- retry: fail -> requeue while attempts < max_attempts, then failed
|
||||
- requeue_running_jobs (queue-recovery)
|
||||
- count_running_jobs / job_status_counts / recent_jobs
|
||||
- QueueWorker respects max_concurrency (Popen / launch fully mocked)
|
||||
|
||||
The real claude/Popen is NEVER spawned: launcher.launch_job is mocked in worker
|
||||
tests, and the launcher finalize logic is exercised directly via mark_job.
|
||||
"""
|
||||
import os
|
||||
import tempfile
|
||||
|
||||
import pytest
|
||||
|
||||
# Override env before importing app modules (same convention as test_qg.py).
|
||||
_test_db = os.path.join(tempfile.gettempdir(), "test_orchestrator_queue.db")
|
||||
os.environ["ORCH_DB_PATH"] = _test_db
|
||||
os.environ["ORCH_REPOS_DIR"] = tempfile.gettempdir()
|
||||
os.environ["ORCH_GITEA_TOKEN"] = "test-token"
|
||||
os.environ["ORCH_PLANE_API_TOKEN"] = "test-token"
|
||||
|
||||
import src.db as db
|
||||
from src.db import (
|
||||
init_db,
|
||||
enqueue_job,
|
||||
claim_next_job,
|
||||
mark_job,
|
||||
count_running_jobs,
|
||||
requeue_running_jobs,
|
||||
get_job,
|
||||
job_status_counts,
|
||||
recent_jobs,
|
||||
)
|
||||
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def fresh_db(tmp_path, monkeypatch):
|
||||
"""Point the DB at a fresh per-test sqlite file and init the schema."""
|
||||
dbfile = tmp_path / "queue.db"
|
||||
monkeypatch.setattr(db.settings, "db_path", str(dbfile))
|
||||
init_db()
|
||||
yield
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# enqueue / claim / mark lifecycle
|
||||
# ---------------------------------------------------------------------------
|
||||
class TestLifecycle:
|
||||
def test_enqueue_creates_queued_job(self):
|
||||
jid = enqueue_job("analyst", "enduro-trails", "task body", task_id=7)
|
||||
job = get_job(jid)
|
||||
assert job["status"] == "queued"
|
||||
assert job["agent"] == "analyst"
|
||||
assert job["repo"] == "enduro-trails"
|
||||
assert job["task_content"] == "task body"
|
||||
assert job["task_id"] == 7
|
||||
assert job["attempts"] == 0
|
||||
assert job["max_attempts"] == 2
|
||||
|
||||
def test_claim_marks_running_and_increments_attempts(self):
|
||||
jid = enqueue_job("developer", "repo")
|
||||
claimed = claim_next_job()
|
||||
assert claimed is not None
|
||||
assert claimed["id"] == jid
|
||||
assert claimed["status"] == "running"
|
||||
assert claimed["attempts"] == 1
|
||||
assert count_running_jobs() == 1
|
||||
|
||||
def test_claim_empty_queue_returns_none(self):
|
||||
assert claim_next_job() is None
|
||||
|
||||
def test_claim_is_fifo(self):
|
||||
a = enqueue_job("analyst", "r")
|
||||
b = enqueue_job("developer", "r")
|
||||
assert claim_next_job()["id"] == a
|
||||
assert claim_next_job()["id"] == b
|
||||
|
||||
def test_mark_done(self):
|
||||
jid = enqueue_job("tester", "r")
|
||||
claim_next_job()
|
||||
mark_job(jid, "done", run_id=42)
|
||||
job = get_job(jid)
|
||||
assert job["status"] == "done"
|
||||
assert job["run_id"] == 42
|
||||
assert job["finished_at"] is not None
|
||||
assert count_running_jobs() == 0
|
||||
|
||||
def test_mark_failed_records_error(self):
|
||||
jid = enqueue_job("tester", "r")
|
||||
claim_next_job()
|
||||
mark_job(jid, "failed", run_id=9, error="boom")
|
||||
job = get_job(jid)
|
||||
assert job["status"] == "failed"
|
||||
assert job["error"] == "boom"
|
||||
assert job["finished_at"] is not None
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# claim atomicity — no double dispatch
|
||||
# ---------------------------------------------------------------------------
|
||||
class TestClaimAtomicity:
|
||||
def test_single_job_claimed_once(self):
|
||||
jid = enqueue_job("analyst", "r")
|
||||
first = claim_next_job()
|
||||
second = claim_next_job()
|
||||
assert first["id"] == jid
|
||||
assert second is None # already running, not re-dispatched
|
||||
|
||||
def test_concurrent_claims_no_duplicate(self):
|
||||
"""Many enqueued jobs claimed from parallel threads -> each claimed once."""
|
||||
import threading
|
||||
|
||||
n = 20
|
||||
for _ in range(n):
|
||||
enqueue_job("developer", "r")
|
||||
|
||||
claimed_ids = []
|
||||
lock = threading.Lock()
|
||||
|
||||
def grab():
|
||||
while True:
|
||||
job = claim_next_job()
|
||||
if job is None:
|
||||
return
|
||||
with lock:
|
||||
claimed_ids.append(job["id"])
|
||||
|
||||
threads = [threading.Thread(target=grab) for _ in range(8)]
|
||||
for t in threads:
|
||||
t.start()
|
||||
for t in threads:
|
||||
t.join()
|
||||
|
||||
assert len(claimed_ids) == n
|
||||
assert len(set(claimed_ids)) == n # no id claimed twice
|
||||
assert count_running_jobs() == n
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# retry semantics (mirrors launcher._finalize_job logic)
|
||||
# ---------------------------------------------------------------------------
|
||||
class TestRetry:
|
||||
def test_fail_requeues_while_under_max(self):
|
||||
jid = enqueue_job("developer", "r", max_attempts=2)
|
||||
job = claim_next_job() # attempts=1
|
||||
assert job["attempts"] == 1
|
||||
# attempts(1) < max(2) -> requeue
|
||||
mark_job(jid, "queued", error="exit 1")
|
||||
j = get_job(jid)
|
||||
assert j["status"] == "queued"
|
||||
assert j["error"] == "exit 1"
|
||||
assert j["started_at"] is None # requeue clears started_at
|
||||
|
||||
def test_fail_fails_when_max_reached(self):
|
||||
jid = enqueue_job("developer", "r", max_attempts=2)
|
||||
claim_next_job() # attempts=1 -> requeue
|
||||
mark_job(jid, "queued")
|
||||
job2 = claim_next_job() # attempts=2
|
||||
assert job2["attempts"] == 2
|
||||
# attempts(2) >= max(2) -> failed
|
||||
mark_job(jid, "failed", error="exit 1")
|
||||
assert get_job(jid)["status"] == "failed"
|
||||
|
||||
def test_finalize_job_done(self):
|
||||
"""launcher._finalize_job marks done on exit_code 0 (no Popen needed)."""
|
||||
from src.agents.launcher import AgentLauncher
|
||||
jid = enqueue_job("analyst", "r")
|
||||
claim_next_job()
|
||||
AgentLauncher()._finalize_job(jid, "analyst", run_id=5, exit_code=0)
|
||||
assert get_job(jid)["status"] == "done"
|
||||
|
||||
def test_finalize_job_requeue_then_fail(self, monkeypatch):
|
||||
from src.agents.launcher import AgentLauncher
|
||||
# Silence telegram side-effect.
|
||||
monkeypatch.setattr("src.notifications.send_telegram", lambda *a, **k: None)
|
||||
lr = AgentLauncher()
|
||||
jid = enqueue_job("developer", "r", max_attempts=2)
|
||||
|
||||
claim_next_job() # attempts=1
|
||||
lr._finalize_job(jid, "developer", run_id=1, exit_code=2)
|
||||
assert get_job(jid)["status"] == "queued" # 1 < 2 -> requeue
|
||||
|
||||
claim_next_job() # attempts=2
|
||||
lr._finalize_job(jid, "developer", run_id=2, exit_code=2)
|
||||
assert get_job(jid)["status"] == "failed" # 2 >= 2 -> failed
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# queue-recovery
|
||||
# ---------------------------------------------------------------------------
|
||||
class TestRequeueRunning:
|
||||
def test_requeue_running_jobs(self):
|
||||
a = enqueue_job("analyst", "r")
|
||||
b = enqueue_job("developer", "r")
|
||||
claim_next_job() # a -> running
|
||||
claim_next_job() # b -> running
|
||||
assert count_running_jobs() == 2
|
||||
n = requeue_running_jobs()
|
||||
assert n == 2
|
||||
assert count_running_jobs() == 0
|
||||
assert get_job(a)["status"] == "queued"
|
||||
assert get_job(b)["status"] == "queued"
|
||||
|
||||
def test_requeue_preserves_attempts(self):
|
||||
jid = enqueue_job("analyst", "r")
|
||||
claim_next_job() # attempts=1
|
||||
requeue_running_jobs()
|
||||
assert get_job(jid)["attempts"] == 1 # not reset
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# observability helpers
|
||||
# ---------------------------------------------------------------------------
|
||||
class TestObservability:
|
||||
def test_status_counts(self):
|
||||
enqueue_job("analyst", "r") # stays queued
|
||||
enqueue_job("developer", "r") # first claimed -> running (FIFO)
|
||||
claim_next_job()
|
||||
counts = job_status_counts()
|
||||
assert counts["running"] == 1
|
||||
assert counts["queued"] == 1
|
||||
assert counts["done"] == 0
|
||||
assert counts["failed"] == 0
|
||||
|
||||
def test_recent_jobs_desc(self):
|
||||
ids = [enqueue_job("analyst", "r") for _ in range(3)]
|
||||
recent = recent_jobs(10)
|
||||
assert [r["id"] for r in recent] == sorted(ids, reverse=True)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# QueueWorker max_concurrency (launch_job fully mocked — no real Popen)
|
||||
# ---------------------------------------------------------------------------
|
||||
class TestWorkerConcurrency:
|
||||
@pytest.fixture(autouse=True)
|
||||
def _ok_preflight(self, monkeypatch):
|
||||
# ORCH-1 resilience: the worker gates claims behind preflight; in tests there
|
||||
# is no claude binary, so stub preflight OK to exercise pure queue/concurrency.
|
||||
monkeypatch.setattr("src.queue_worker.preflight.check", lambda *a, **k: (True, "ok"))
|
||||
|
||||
def test_worker_respects_max_concurrency(self, monkeypatch):
|
||||
from src.queue_worker import QueueWorker
|
||||
|
||||
launched = []
|
||||
|
||||
def fake_launch_job(job):
|
||||
# Simulate a long-running agent: the job stays 'running' (we do NOT
|
||||
# mark it done), so the slot remains occupied.
|
||||
launched.append(job["id"])
|
||||
return 100 + job["id"]
|
||||
|
||||
monkeypatch.setattr("src.queue_worker.launcher.launch_job", fake_launch_job)
|
||||
|
||||
for _ in range(5):
|
||||
enqueue_job("developer", "r")
|
||||
|
||||
w = QueueWorker(max_concurrency=2, poll_interval=0.01)
|
||||
w._drain_once()
|
||||
|
||||
# Only max_concurrency jobs may be launched / running at once.
|
||||
assert len(launched) == 2
|
||||
assert count_running_jobs() == 2
|
||||
|
||||
def test_worker_drains_as_slots_free(self, monkeypatch):
|
||||
from src.queue_worker import QueueWorker
|
||||
|
||||
def fake_launch_job(job):
|
||||
# Immediately complete the job so the slot frees for the next claim.
|
||||
mark_job(job["id"], "done", run_id=job["id"])
|
||||
return job["id"]
|
||||
|
||||
monkeypatch.setattr("src.queue_worker.launcher.launch_job", fake_launch_job)
|
||||
|
||||
for _ in range(4):
|
||||
enqueue_job("analyst", "r")
|
||||
|
||||
w = QueueWorker(max_concurrency=1, poll_interval=0.01)
|
||||
w._drain_once()
|
||||
|
||||
# With instant completion and concurrency 1, one drain pass empties the queue.
|
||||
assert job_status_counts()["done"] == 4
|
||||
assert count_running_jobs() == 0
|
||||
|
||||
def test_worker_launch_failure_does_not_wedge_slot(self, monkeypatch):
|
||||
from src.queue_worker import QueueWorker
|
||||
|
||||
def boom(job):
|
||||
raise RuntimeError("repo missing")
|
||||
|
||||
monkeypatch.setattr("src.queue_worker.launcher.launch_job", boom)
|
||||
monkeypatch.setattr("src.notifications.send_telegram", lambda *a, **k: None)
|
||||
|
||||
enqueue_job("developer", "r", max_attempts=1)
|
||||
w = QueueWorker(max_concurrency=1, poll_interval=0.01)
|
||||
w._drain_once()
|
||||
|
||||
# attempts=1 >= max_attempts=1 -> failed, not stuck running.
|
||||
assert count_running_jobs() == 0
|
||||
counts = job_status_counts()
|
||||
assert counts["failed"] == 1
|
||||
295
tests/test_resilience.py
Normal file
295
tests/test_resilience.py
Normal file
@@ -0,0 +1,295 @@
|
||||
"""ORCH-1 resilience tests: preflight, 429-classifier, backoff, circuit breaker.
|
||||
|
||||
No real claude/Popen is ever spawned: preflight subprocess and launcher.launch_job
|
||||
are mocked. DB is a fresh per-test sqlite file.
|
||||
"""
|
||||
import os
|
||||
import tempfile
|
||||
|
||||
import pytest
|
||||
|
||||
_test_db = os.path.join(tempfile.gettempdir(), "test_orchestrator_resilience.db")
|
||||
os.environ["ORCH_DB_PATH"] = _test_db
|
||||
os.environ["ORCH_REPOS_DIR"] = tempfile.gettempdir()
|
||||
os.environ["ORCH_GITEA_TOKEN"] = "test-token"
|
||||
os.environ["ORCH_PLANE_API_TOKEN"] = "test-token"
|
||||
|
||||
import src.db as db
|
||||
from src.db import (
|
||||
init_db, enqueue_job, claim_next_job, get_job, count_running_jobs,
|
||||
mark_job_transient,
|
||||
)
|
||||
from src import preflight, error_classifier
|
||||
from src.error_classifier import classify_text, parse_retry_after, classify_log_file
|
||||
from src.queue_worker import QueueWorker, CircuitBreaker
|
||||
from src.agents.launcher import AgentLauncher
|
||||
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def fresh_db(tmp_path, monkeypatch):
|
||||
monkeypatch.setattr(db.settings, "db_path", str(tmp_path / "res.db"))
|
||||
init_db()
|
||||
preflight.reset_cache()
|
||||
yield
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# A. Preflight
|
||||
# ---------------------------------------------------------------------------
|
||||
class TestPreflight:
|
||||
def test_fail_when_bin_missing(self, monkeypatch):
|
||||
monkeypatch.setattr(preflight, "_claude_bin", lambda: "/no/such/claude")
|
||||
ok, reason = preflight.check(force=True)
|
||||
assert ok is False
|
||||
assert "not found" in reason.lower()
|
||||
|
||||
def test_ok_when_version_succeeds(self, monkeypatch, tmp_path):
|
||||
fake_bin = tmp_path / "claude"
|
||||
fake_bin.write_text("#!/bin/sh\necho v1\n")
|
||||
monkeypatch.setattr(preflight, "_claude_bin", lambda: str(fake_bin))
|
||||
monkeypatch.setattr(preflight, "_run_version", lambda b: (True, "1.2.3"))
|
||||
ok, reason = preflight.check(force=True)
|
||||
assert ok is True
|
||||
|
||||
def test_cache_does_not_recheck_within_ttl(self, monkeypatch, tmp_path):
|
||||
fake_bin = tmp_path / "claude"
|
||||
fake_bin.write_text("x")
|
||||
monkeypatch.setattr(preflight, "_claude_bin", lambda: str(fake_bin))
|
||||
monkeypatch.setattr(db.settings, "preflight_cache_ttl", 999)
|
||||
|
||||
calls = {"n": 0}
|
||||
|
||||
def counting_version(b):
|
||||
calls["n"] += 1
|
||||
return True, "ok"
|
||||
|
||||
monkeypatch.setattr(preflight, "_run_version", counting_version)
|
||||
preflight.reset_cache()
|
||||
preflight.check() # first -> runs version
|
||||
preflight.check() # cached -> no extra version call
|
||||
preflight.check()
|
||||
assert calls["n"] == 1
|
||||
|
||||
def test_force_bypasses_cache(self, monkeypatch, tmp_path):
|
||||
fake_bin = tmp_path / "claude"
|
||||
fake_bin.write_text("x")
|
||||
monkeypatch.setattr(preflight, "_claude_bin", lambda: str(fake_bin))
|
||||
calls = {"n": 0}
|
||||
monkeypatch.setattr(preflight, "_run_version",
|
||||
lambda b: (calls.__setitem__("n", calls["n"] + 1), (True, "ok"))[1])
|
||||
preflight.reset_cache()
|
||||
preflight.check()
|
||||
preflight.check(force=True)
|
||||
assert calls["n"] == 2
|
||||
|
||||
def test_worker_does_not_claim_when_preflight_fails(self, monkeypatch):
|
||||
# Preflight FAIL -> job stays queued, launch_job never called.
|
||||
monkeypatch.setattr("src.queue_worker.preflight.check",
|
||||
lambda *a, **k: (False, "down"))
|
||||
called = {"launch": False}
|
||||
monkeypatch.setattr("src.queue_worker.launcher.launch_job",
|
||||
lambda job: called.__setitem__("launch", True))
|
||||
jid = enqueue_job("analyst", "r")
|
||||
QueueWorker(max_concurrency=1, poll_interval=0.01)._drain_once()
|
||||
assert called["launch"] is False
|
||||
assert get_job(jid)["status"] == "queued"
|
||||
assert count_running_jobs() == 0
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# B. Error classifier
|
||||
# ---------------------------------------------------------------------------
|
||||
class TestClassifier:
|
||||
@pytest.mark.parametrize("text", [
|
||||
"Error: 429 Too Many Requests",
|
||||
"anthropic rate limit exceeded",
|
||||
"overloaded_error: server is overloaded",
|
||||
"API quota exhausted",
|
||||
"503 Service Unavailable",
|
||||
"connection reset by peer",
|
||||
])
|
||||
def test_transient_patterns(self, text):
|
||||
assert classify_text(text) == "transient"
|
||||
|
||||
@pytest.mark.parametrize("text", [
|
||||
"Traceback: KeyError 'foo'",
|
||||
"SyntaxError: invalid syntax",
|
||||
"assertion failed in test",
|
||||
"",
|
||||
])
|
||||
def test_permanent_patterns(self, text):
|
||||
assert classify_text(text) == "permanent"
|
||||
|
||||
def test_retry_after_header(self):
|
||||
assert parse_retry_after("HTTP/1.1 429\nRetry-After: 42\n") == 42
|
||||
|
||||
def test_retry_after_json(self):
|
||||
assert parse_retry_after('{"error":{"type":"rate_limit","retry_after": 7}}') == 7
|
||||
|
||||
def test_retry_after_absent(self):
|
||||
assert parse_retry_after("just an error") is None
|
||||
|
||||
def test_classify_log_file(self, tmp_path):
|
||||
p = tmp_path / "run.log"
|
||||
p.write_text("...lots of output...\n429 rate limit. Retry-After: 30\n")
|
||||
kind, ra = classify_log_file(str(p))
|
||||
assert kind == "transient"
|
||||
assert ra == 30
|
||||
|
||||
def test_classify_missing_file_is_permanent(self):
|
||||
kind, ra = classify_log_file("/no/such/log")
|
||||
assert kind == "permanent"
|
||||
assert ra is None
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# C. Backoff + available_at gating
|
||||
# ---------------------------------------------------------------------------
|
||||
class TestBackoff:
|
||||
def test_backoff_grows_exponentially(self):
|
||||
lr = AgentLauncher()
|
||||
# base=10, cap=600 (defaults)
|
||||
b1 = lr._backoff_seconds(1)
|
||||
b2 = lr._backoff_seconds(2)
|
||||
b3 = lr._backoff_seconds(3)
|
||||
assert b1 == 20 # 2^1*10
|
||||
assert b2 == 40 # 2^2*10
|
||||
assert b3 == 80 # 2^3*10
|
||||
assert b2 > b1 and b3 > b2
|
||||
|
||||
def test_backoff_capped(self):
|
||||
lr = AgentLauncher()
|
||||
assert lr._backoff_seconds(20) == 600 # capped at backoff_max_seconds
|
||||
|
||||
def test_retry_after_respected_when_larger(self):
|
||||
lr = AgentLauncher()
|
||||
# transient_attempts=1 -> base backoff 20; Retry-After=120 wins.
|
||||
assert lr._backoff_seconds(1, retry_after=120) == 120
|
||||
|
||||
def test_retry_after_ignored_when_smaller(self):
|
||||
lr = AgentLauncher()
|
||||
assert lr._backoff_seconds(3, retry_after=5) == 80 # backoff bigger
|
||||
|
||||
def test_transient_requeue_sets_future_available_at_and_claim_skips(self):
|
||||
jid = enqueue_job("developer", "r")
|
||||
claim_next_job()
|
||||
# Big backoff -> available_at far in the future.
|
||||
mark_job_transient(jid, 3600, error="429")
|
||||
job = get_job(jid)
|
||||
assert job["status"] == "queued"
|
||||
assert job["transient_attempts"] == 1
|
||||
assert job["available_at"] is not None
|
||||
# claim must NOT pick it up while available_at is in the future.
|
||||
assert claim_next_job() is None
|
||||
|
||||
def test_transient_requeue_claimable_when_due(self):
|
||||
jid = enqueue_job("developer", "r")
|
||||
claim_next_job()
|
||||
mark_job_transient(jid, -5, error="429") # available_at in the past
|
||||
c = claim_next_job()
|
||||
assert c is not None and c["id"] == jid
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# D. Launcher transient/permanent finalize (no Popen)
|
||||
# ---------------------------------------------------------------------------
|
||||
class TestFinalizeClassified:
|
||||
def test_transient_failure_backoff_requeue(self, tmp_path, monkeypatch):
|
||||
monkeypatch.setattr("src.notifications.send_telegram", lambda *a, **k: None)
|
||||
log = tmp_path / "1.log"
|
||||
log.write_text("Error 429 rate limit exceeded\n")
|
||||
jid = enqueue_job("developer", "r", max_attempts=2)
|
||||
claim_next_job()
|
||||
AgentLauncher()._finalize_job(jid, "developer", run_id=1, exit_code=1,
|
||||
output_path=str(log))
|
||||
job = get_job(jid)
|
||||
assert job["status"] == "queued"
|
||||
assert job["transient_attempts"] == 1
|
||||
assert job["available_at"] is not None # backoff-gated
|
||||
assert job["attempts"] == 1 # code-fault budget NOT burned
|
||||
|
||||
def test_permanent_failure_uses_normal_attempts(self, tmp_path, monkeypatch):
|
||||
monkeypatch.setattr("src.notifications.send_telegram", lambda *a, **k: None)
|
||||
log = tmp_path / "2.log"
|
||||
log.write_text("Traceback: ValueError\n")
|
||||
jid = enqueue_job("developer", "r", max_attempts=2)
|
||||
claim_next_job()
|
||||
AgentLauncher()._finalize_job(jid, "developer", run_id=2, exit_code=1,
|
||||
output_path=str(log))
|
||||
job = get_job(jid)
|
||||
assert job["status"] == "queued"
|
||||
assert job["transient_attempts"] == 0 # not transient
|
||||
assert job["available_at"] is None # no backoff for code-fault
|
||||
|
||||
def test_transient_exhausts_to_failed(self, tmp_path, monkeypatch):
|
||||
monkeypatch.setattr("src.notifications.send_telegram", lambda *a, **k: None)
|
||||
monkeypatch.setattr(db.settings, "transient_max_attempts", 2)
|
||||
log = tmp_path / "3.log"
|
||||
log.write_text("overloaded_error\n")
|
||||
lr = AgentLauncher()
|
||||
jid = enqueue_job("developer", "r")
|
||||
claim_next_job()
|
||||
lr._finalize_job(jid, "developer", 1, exit_code=1, output_path=str(log))
|
||||
assert get_job(jid)["status"] == "queued" # transient 1 -> requeue
|
||||
# force claimable and retry
|
||||
mark_job_transient(jid, -1) # makes it due; transient=2 now
|
||||
claim_next_job()
|
||||
lr._finalize_job(jid, "developer", 2, exit_code=1, output_path=str(log))
|
||||
assert get_job(jid)["status"] == "failed" # transient budget exhausted
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# E. Circuit breaker
|
||||
# ---------------------------------------------------------------------------
|
||||
class TestCircuitBreaker:
|
||||
def test_opens_after_threshold(self):
|
||||
cb = CircuitBreaker(threshold=3, pause_seconds=300)
|
||||
assert cb.allow_claim() is True
|
||||
cb.record_transient()
|
||||
cb.record_transient()
|
||||
assert cb.state == "closed"
|
||||
cb.record_transient() # 3rd -> open
|
||||
assert cb.state == "open"
|
||||
assert cb.allow_claim() is False # paused, no CLI calls
|
||||
|
||||
def test_recovered_resets_streak(self):
|
||||
cb = CircuitBreaker(threshold=3)
|
||||
cb.record_transient()
|
||||
cb.record_transient()
|
||||
cb.record_recovered()
|
||||
assert cb.consecutive_transient == 0
|
||||
assert cb.state == "closed"
|
||||
|
||||
def test_half_open_after_pause_then_closed_on_success(self, monkeypatch):
|
||||
cb = CircuitBreaker(threshold=2, pause_seconds=300)
|
||||
cb.record_transient()
|
||||
cb.record_transient() # open
|
||||
assert cb.state == "open"
|
||||
# Simulate the pause elapsing.
|
||||
cb.opened_at -= 301
|
||||
assert cb.allow_claim() is True # -> half-open (probe)
|
||||
assert cb.state == "half-open"
|
||||
cb.record_recovered() # probe succeeded
|
||||
assert cb.state == "closed"
|
||||
|
||||
def test_half_open_reopens_on_transient(self):
|
||||
cb = CircuitBreaker(threshold=2, pause_seconds=300)
|
||||
cb.record_transient(); cb.record_transient() # open
|
||||
cb.opened_at -= 301
|
||||
cb.allow_claim() # half-open
|
||||
assert cb.state == "half-open"
|
||||
cb.record_transient() # probe failed -> re-open
|
||||
assert cb.state == "open"
|
||||
|
||||
def test_breaker_blocks_worker_claim(self, monkeypatch):
|
||||
monkeypatch.setattr("src.queue_worker.preflight.check",
|
||||
lambda *a, **k: (True, "ok"))
|
||||
called = {"launch": False}
|
||||
monkeypatch.setattr("src.queue_worker.launcher.launch_job",
|
||||
lambda job: called.__setitem__("launch", True))
|
||||
cb = CircuitBreaker(threshold=1, pause_seconds=300)
|
||||
cb.record_transient() # open immediately
|
||||
w = QueueWorker(max_concurrency=1, poll_interval=0.01, breaker=cb)
|
||||
enqueue_job("analyst", "r")
|
||||
w._drain_once()
|
||||
assert called["launch"] is False # breaker open -> no claim, no CLI
|
||||
395
tests/test_stage_engine.py
Normal file
395
tests/test_stage_engine.py
Normal file
@@ -0,0 +1,395 @@
|
||||
"""ORCH-4 / M-3: tests for the unified stage engine (src/stage_engine.advance_stage).
|
||||
|
||||
These verify the MERGED behavior of what used to be two diverged
|
||||
_try_advance_stage implementations (launcher sync + plane async):
|
||||
|
||||
* happy-path advance for every stage launches the CORRECT agent
|
||||
(the ORCH-4 fix: agent = get_agent_for_stage(current_stage), NOT next_stage);
|
||||
* a QG failure does not advance;
|
||||
* reviewer REQUEST_CHANGES -> rollback to development + enqueue developer;
|
||||
* developer retries > 3 -> telegram alert, no further enqueue;
|
||||
* tester FAIL -> rollback to development + enqueue developer;
|
||||
* architect conflict (10-conflict.md) -> rollback to analysis + enqueue analyst;
|
||||
* launcher AND plane both delegate to the engine.
|
||||
|
||||
Network/Plane/Telegram side effects are mocked at the src.stage_engine level so
|
||||
the engine runs against a real isolated sqlite DB.
|
||||
"""
|
||||
|
||||
import os
|
||||
import tempfile
|
||||
|
||||
import pytest
|
||||
|
||||
# Isolated test DB (same convention as the other suites).
|
||||
_test_db = os.path.join(tempfile.gettempdir(), "test_orchestrator_stage_engine.db")
|
||||
os.environ["ORCH_DB_PATH"] = _test_db
|
||||
os.environ["ORCH_REPOS_DIR"] = tempfile.gettempdir()
|
||||
os.environ.setdefault("ORCH_GITEA_TOKEN", "test-token")
|
||||
os.environ.setdefault("ORCH_PLANE_API_TOKEN", "test-token")
|
||||
|
||||
from unittest.mock import MagicMock, patch # noqa: E402
|
||||
|
||||
import src.db as _db # noqa: E402
|
||||
from src.db import init_db, get_db # noqa: E402
|
||||
from src import stage_engine # noqa: E402
|
||||
from src.stage_engine import advance_stage # noqa: E402
|
||||
from src.stages import get_agent_for_stage # noqa: E402
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Fixtures
|
||||
# ---------------------------------------------------------------------------
|
||||
@pytest.fixture(autouse=True)
|
||||
def fresh_db(monkeypatch):
|
||||
"""Fresh isolated DB per test."""
|
||||
monkeypatch.setattr(_db.settings, "db_path", _test_db)
|
||||
if os.path.exists(_test_db):
|
||||
os.unlink(_test_db)
|
||||
init_db()
|
||||
yield
|
||||
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def silence_side_effects(monkeypatch):
|
||||
"""Mock all Plane/Telegram/notification side effects in the engine.
|
||||
|
||||
Everything imported into src.stage_engine that touches the network or sends
|
||||
a message becomes a no-op MagicMock so tests are deterministic and offline.
|
||||
"""
|
||||
for name in (
|
||||
"notify_stage_change",
|
||||
"notify_qg_failure",
|
||||
"notify_approve_requested",
|
||||
"send_telegram",
|
||||
"plane_notify_stage",
|
||||
"plane_notify_qg",
|
||||
"plane_add_comment",
|
||||
"set_issue_in_review",
|
||||
"set_issue_needs_input",
|
||||
"set_issue_in_progress",
|
||||
"set_issue_blocked",
|
||||
):
|
||||
monkeypatch.setattr(stage_engine, name, MagicMock())
|
||||
|
||||
|
||||
def _make_task(stage, repo="enduro-trails", branch="feature/ET-001-x", wi="ET-001"):
|
||||
conn = get_db()
|
||||
cur = conn.execute(
|
||||
"INSERT INTO tasks (plane_id, work_item_id, repo, branch, stage) "
|
||||
"VALUES (?, ?, ?, ?, ?)",
|
||||
(f"plane-{wi}", wi, repo, branch, stage),
|
||||
)
|
||||
task_id = cur.lastrowid
|
||||
conn.commit()
|
||||
conn.close()
|
||||
return task_id
|
||||
|
||||
|
||||
def _stage(task_id):
|
||||
conn = get_db()
|
||||
row = conn.execute("SELECT stage FROM tasks WHERE id=?", (task_id,)).fetchone()
|
||||
conn.close()
|
||||
return row[0]
|
||||
|
||||
|
||||
def _jobs():
|
||||
conn = get_db()
|
||||
rows = conn.execute("SELECT agent, repo, task_id FROM jobs ORDER BY id").fetchall()
|
||||
conn.close()
|
||||
return [dict(r) for r in rows]
|
||||
|
||||
|
||||
def _add_developer_runs(task_id, n):
|
||||
conn = get_db()
|
||||
for _ in range(n):
|
||||
conn.execute(
|
||||
"INSERT INTO agent_runs (task_id, agent) VALUES (?, 'developer')",
|
||||
(task_id,),
|
||||
)
|
||||
conn.commit()
|
||||
conn.close()
|
||||
|
||||
|
||||
def _pass(*a, **k):
|
||||
return (True, "ok")
|
||||
|
||||
|
||||
def _fail(reason):
|
||||
def _f(*a, **k):
|
||||
return (False, reason)
|
||||
return _f
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Happy path: each stage advances and launches the CORRECT agent (ORCH-4 fix)
|
||||
# ---------------------------------------------------------------------------
|
||||
class TestHappyPathAgentSelection:
|
||||
"""The fixed agent-selection: when advancing FROM current_stage, the engine
|
||||
must enqueue get_agent_for_stage(current_stage), NOT next_stage.
|
||||
"""
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"current_stage,expected_next,expected_agent",
|
||||
[
|
||||
("architecture", "development", "developer"),
|
||||
("development", "review", "reviewer"),
|
||||
("review", "testing", "tester"),
|
||||
("testing", "deploy", "deployer"),
|
||||
],
|
||||
)
|
||||
def test_advance_launches_current_stage_agent(
|
||||
self, monkeypatch, current_stage, expected_next, expected_agent
|
||||
):
|
||||
# All QG checks pass for this happy-path suite.
|
||||
monkeypatch.setattr(
|
||||
stage_engine, "QG_CHECKS",
|
||||
{k: _pass for k in stage_engine.QG_CHECKS},
|
||||
)
|
||||
task_id = _make_task(current_stage)
|
||||
|
||||
res = advance_stage(
|
||||
task_id, current_stage, "enduro-trails", "ET-001",
|
||||
"feature/ET-001-x", finished_agent=None,
|
||||
)
|
||||
|
||||
assert res.advanced is True
|
||||
assert res.to_stage == expected_next
|
||||
assert _stage(task_id) == expected_next
|
||||
# The ORCH-4 fix: correct agent == get_agent_for_stage(current_stage).
|
||||
assert expected_agent == get_agent_for_stage(current_stage)
|
||||
assert res.enqueued_agent == expected_agent
|
||||
jobs = _jobs()
|
||||
assert len(jobs) == 1
|
||||
assert jobs[0]["agent"] == expected_agent
|
||||
|
||||
def test_deploy_to_done_no_agent(self, monkeypatch):
|
||||
"""deploy -> done advances but launches no agent (terminal-ish)."""
|
||||
monkeypatch.setattr(
|
||||
stage_engine, "QG_CHECKS",
|
||||
{k: _pass for k in stage_engine.QG_CHECKS},
|
||||
)
|
||||
task_id = _make_task("deploy")
|
||||
res = advance_stage(task_id, "deploy", "enduro-trails", "ET-001",
|
||||
"feature/ET-001-x", finished_agent=None)
|
||||
assert res.advanced is True
|
||||
assert _stage(task_id) == "done"
|
||||
assert res.enqueued_agent is None
|
||||
assert _jobs() == []
|
||||
|
||||
def test_done_is_terminal(self):
|
||||
task_id = _make_task("done")
|
||||
res = advance_stage(task_id, "done", "enduro-trails", "ET-001",
|
||||
"feature/ET-001-x", finished_agent=None)
|
||||
assert res.advanced is False
|
||||
assert _stage(task_id) == "done"
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# QG failure: do not advance
|
||||
# ---------------------------------------------------------------------------
|
||||
class TestQgFailureDoesNotAdvance:
|
||||
def test_qg_fail_keeps_stage(self, monkeypatch):
|
||||
monkeypatch.setattr(
|
||||
stage_engine, "QG_CHECKS",
|
||||
{**stage_engine.QG_CHECKS, "check_architecture_done": _fail("not done")},
|
||||
)
|
||||
task_id = _make_task("architecture")
|
||||
res = advance_stage(task_id, "architecture", "enduro-trails", "ET-001",
|
||||
"feature/ET-001-x", finished_agent="architect")
|
||||
assert res.advanced is False
|
||||
assert res.qg_passed is False
|
||||
assert _stage(task_id) == "architecture"
|
||||
assert _jobs() == []
|
||||
|
||||
def test_webhook_path_emits_qg_failure_notification(self, monkeypatch):
|
||||
"""finished_agent=None -> generic QG-failure notification fires (plane parity)."""
|
||||
monkeypatch.setattr(
|
||||
stage_engine, "QG_CHECKS",
|
||||
{**stage_engine.QG_CHECKS, "check_tests_local": _fail("ci red")},
|
||||
)
|
||||
task_id = _make_task("development")
|
||||
advance_stage(task_id, "development", "enduro-trails", "ET-001",
|
||||
"feature/ET-001-x", finished_agent=None)
|
||||
assert stage_engine.notify_qg_failure.called
|
||||
assert stage_engine.plane_notify_qg.called
|
||||
|
||||
def test_launcher_path_no_generic_qg_notification(self, monkeypatch):
|
||||
"""finished_agent set -> NO generic QG notification (launcher parity)."""
|
||||
monkeypatch.setattr(
|
||||
stage_engine, "QG_CHECKS",
|
||||
{**stage_engine.QG_CHECKS, "check_architecture_done": _fail("not done")},
|
||||
)
|
||||
task_id = _make_task("architecture")
|
||||
advance_stage(task_id, "architecture", "enduro-trails", "ET-001",
|
||||
"feature/ET-001-x", finished_agent="architect")
|
||||
assert not stage_engine.notify_qg_failure.called
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Reviewer REQUEST_CHANGES -> rollback to development + enqueue developer
|
||||
# ---------------------------------------------------------------------------
|
||||
class TestReviewerRequestChanges:
|
||||
def test_rollback_and_enqueue_developer(self, monkeypatch):
|
||||
monkeypatch.setattr(
|
||||
stage_engine, "QG_CHECKS",
|
||||
{**stage_engine.QG_CHECKS,
|
||||
"check_reviewer_verdict": _fail("verdict: REQUEST_CHANGES")},
|
||||
)
|
||||
task_id = _make_task("review")
|
||||
res = advance_stage(task_id, "review", "enduro-trails", "ET-001",
|
||||
"feature/ET-001-x", finished_agent="reviewer")
|
||||
assert res.advanced is False
|
||||
assert res.rolled_back_to == "development"
|
||||
assert _stage(task_id) == "development"
|
||||
jobs = _jobs()
|
||||
assert len(jobs) == 1
|
||||
assert jobs[0]["agent"] == "developer"
|
||||
|
||||
def test_retry_over_3_alerts_no_enqueue(self, monkeypatch):
|
||||
monkeypatch.setattr(
|
||||
stage_engine, "QG_CHECKS",
|
||||
{**stage_engine.QG_CHECKS,
|
||||
"check_reviewer_verdict": _fail("verdict: REQUEST_CHANGES")},
|
||||
)
|
||||
task_id = _make_task("review")
|
||||
_add_developer_runs(task_id, 3) # already at the max
|
||||
res = advance_stage(task_id, "review", "enduro-trails", "ET-001",
|
||||
"feature/ET-001-x", finished_agent="reviewer")
|
||||
assert res.rolled_back_to == "development"
|
||||
assert res.alerted is True
|
||||
assert stage_engine.send_telegram.called
|
||||
# No new developer job enqueued past the retry cap.
|
||||
assert _jobs() == []
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Tester FAIL -> rollback to development + enqueue developer
|
||||
# ---------------------------------------------------------------------------
|
||||
class TestTesterFail:
|
||||
def test_rollback_and_enqueue_developer(self, monkeypatch):
|
||||
monkeypatch.setattr(
|
||||
stage_engine, "QG_CHECKS",
|
||||
{**stage_engine.QG_CHECKS, "check_tests_passed": _fail("2 tests failed")},
|
||||
)
|
||||
task_id = _make_task("testing")
|
||||
res = advance_stage(task_id, "testing", "enduro-trails", "ET-001",
|
||||
"feature/ET-001-x", finished_agent="tester")
|
||||
assert res.advanced is False
|
||||
assert res.rolled_back_to == "development"
|
||||
assert _stage(task_id) == "development"
|
||||
jobs = _jobs()
|
||||
assert len(jobs) == 1
|
||||
assert jobs[0]["agent"] == "developer"
|
||||
|
||||
def test_retry_over_3_blocks_and_alerts(self, monkeypatch):
|
||||
monkeypatch.setattr(
|
||||
stage_engine, "QG_CHECKS",
|
||||
{**stage_engine.QG_CHECKS, "check_tests_passed": _fail("still failing")},
|
||||
)
|
||||
task_id = _make_task("testing")
|
||||
_add_developer_runs(task_id, 3)
|
||||
res = advance_stage(task_id, "testing", "enduro-trails", "ET-001",
|
||||
"feature/ET-001-x", finished_agent="tester")
|
||||
assert res.rolled_back_to == "development"
|
||||
assert res.alerted is True
|
||||
assert stage_engine.set_issue_blocked.called
|
||||
assert _jobs() == []
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Architect conflict -> rollback to analysis + enqueue analyst
|
||||
# ---------------------------------------------------------------------------
|
||||
class TestArchitectConflict:
|
||||
def test_conflict_rolls_back_to_analysis(self, monkeypatch, tmp_path):
|
||||
# 10-conflict.md must exist in the worktree path the engine inspects.
|
||||
wt = tmp_path / "wt"
|
||||
conflict_dir = wt / "docs" / "work-items" / "ET-001"
|
||||
conflict_dir.mkdir(parents=True)
|
||||
(conflict_dir / "10-conflict.md").write_text("conflict with TRZ")
|
||||
|
||||
monkeypatch.setattr(stage_engine, "get_worktree_path", lambda repo, branch: str(wt))
|
||||
monkeypatch.setattr(
|
||||
stage_engine, "QG_CHECKS",
|
||||
{**stage_engine.QG_CHECKS, "check_architecture_done": _fail("conflict")},
|
||||
)
|
||||
task_id = _make_task("architecture")
|
||||
res = advance_stage(task_id, "architecture", "enduro-trails", "ET-001",
|
||||
"feature/ET-001-x", finished_agent="architect")
|
||||
assert res.advanced is False
|
||||
assert res.rolled_back_to == "analysis"
|
||||
assert _stage(task_id) == "analysis"
|
||||
jobs = _jobs()
|
||||
assert len(jobs) == 1
|
||||
assert jobs[0]["agent"] == "analyst"
|
||||
|
||||
def test_no_conflict_file_no_rollback(self, monkeypatch, tmp_path):
|
||||
wt = tmp_path / "wt"
|
||||
(wt / "docs").mkdir(parents=True)
|
||||
monkeypatch.setattr(stage_engine, "get_worktree_path", lambda repo, branch: str(wt))
|
||||
monkeypatch.setattr(
|
||||
stage_engine, "QG_CHECKS",
|
||||
{**stage_engine.QG_CHECKS, "check_architecture_done": _fail("incomplete")},
|
||||
)
|
||||
task_id = _make_task("architecture")
|
||||
res = advance_stage(task_id, "architecture", "enduro-trails", "ET-001",
|
||||
"feature/ET-001-x", finished_agent="architect")
|
||||
assert res.advanced is False
|
||||
assert res.rolled_back_to is None
|
||||
assert _stage(task_id) == "architecture"
|
||||
assert _jobs() == []
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Analyst approved-flow (analysis gate): never auto-advances
|
||||
# ---------------------------------------------------------------------------
|
||||
class TestAnalysisApprovedFlow:
|
||||
def test_artifacts_ready_requests_approval_no_advance(self, monkeypatch):
|
||||
monkeypatch.setattr(
|
||||
stage_engine, "QG_CHECKS",
|
||||
{**stage_engine.QG_CHECKS, "check_analysis_complete": _pass},
|
||||
)
|
||||
task_id = _make_task("analysis")
|
||||
res = advance_stage(task_id, "analysis", "enduro-trails", "ET-001",
|
||||
"feature/ET-001-x", finished_agent="analyst")
|
||||
assert res.advanced is False
|
||||
assert _stage(task_id) == "analysis"
|
||||
assert stage_engine.set_issue_in_review.called
|
||||
assert stage_engine.notify_approve_requested.called
|
||||
assert _jobs() == []
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# launcher + plane both delegate to the engine
|
||||
# ---------------------------------------------------------------------------
|
||||
class TestDelegation:
|
||||
def test_launcher_calls_engine(self):
|
||||
from src.agents.launcher import AgentLauncher
|
||||
task_id = _make_task("development", branch="feature/ET-777-deleg")
|
||||
with patch("src.stage_engine.advance_stage") as m:
|
||||
AgentLauncher()._try_advance_stage(
|
||||
run_id=1, agent="developer", repo="enduro-trails",
|
||||
branch="feature/ET-777-deleg",
|
||||
)
|
||||
m.assert_called_once()
|
||||
kwargs = m.call_args.kwargs
|
||||
assert kwargs["task_id"] == task_id
|
||||
assert kwargs["current_stage"] == "development"
|
||||
assert kwargs["finished_agent"] == "developer"
|
||||
|
||||
def test_plane_calls_engine(self):
|
||||
import asyncio
|
||||
from src.webhooks import plane as plane_mod
|
||||
with patch("src.stage_engine.advance_stage") as m:
|
||||
asyncio.run(
|
||||
plane_mod._try_advance_stage(
|
||||
task_id=5, current_stage="analysis", repo="enduro-trails",
|
||||
work_item_id="ET-001", branch="feature/ET-001-x",
|
||||
)
|
||||
)
|
||||
m.assert_called_once()
|
||||
# plane passes positional args; finished_agent (last positional) is None.
|
||||
args = m.call_args.args
|
||||
assert args[0] == 5
|
||||
assert args[1] == "analysis"
|
||||
assert args[-1] is None
|
||||
Reference in New Issue
Block a user