Compare commits

...

22 Commits

Author SHA1 Message Date
Dev Agent
6abdc220d2 test(stage): cover unified stage_engine + launcher/plane delegation
18 tests: happy-path advance per stage with correct agent (ORCH-4 fix),
QG-fail no-advance, reviewer REQUEST_CHANGES rollback+retry/alert, tester FAIL
rollback+retry/block, architect conflict rollback to analysis, analyst
approved-flow no-advance, and launcher+plane both delegating to the engine.
2026-06-03 08:56:25 +03:00
Dev Agent
51401a3ba9 refactor(launcher,plane): delegate stage advance to stage_engine
launcher._try_advance_stage and plane._try_advance_stage are now thin
wrappers over stage_engine.advance_stage. The plane webhook calls the sync
engine via asyncio.to_thread so there is exactly one implementation. The
launcher forwards finished_agent so the agent-specific rollback branches still
fire; the webhook passes None (human :approved:), matching prior behavior.

Also fixes the agent-selection bug in the launcher path: it used to enqueue
get_agent_for_stage(next_stage) (skipping a stage, e.g. analysis->architecture
launched developer instead of architect). The unified engine uses
get_agent_for_stage(current_stage), consistent with plane and gitea.
2026-06-03 08:56:25 +03:00
Dev Agent
0befc49b1e refactor(stage): extract unified stage_engine.advance_stage (M-3)
Merge the two diverged _try_advance_stage implementations (launcher sync +
plane async) into one synchronous engine. Preserves all launcher business
logic (analyst approved-flow, reviewer REQUEST_CHANGES rollback+retry, tester
FAIL rollback+retry, architect conflict rollback) and the plane
check_review_approved PR-by-branch dispatch. Unifies the QG signature
dispatch. Fixes agent selection: advancing FROM current_stage launches
get_agent_for_stage(current_stage), not next_stage.
2026-06-03 08:56:14 +03:00
fd554c8a5a Merge pull request 'ORCH-7: cleanup + hardening (M-4 dead code + M-2 graceful timeout)' (#4) from feature/ORCH-7-hardening into main 2026-06-03 08:31:26 +03:00
Dev Agent
c167c6930d test(launcher): watchdog graceful kill ordering + timeout config + M-4 removal
Cover M-2: SIGTERM-before-SIGKILL ordering, graceful exit within grace skips
SIGKILL, ProcessLookupError before SIGTERM is tolerated (no _record_kill), and
_resolve_timeout per-agent override / default / malformed-JSON fallback.
Cover M-4: _auto_merge_pr removed, _ensure_pr retained.
2026-06-03 08:28:09 +03:00
Dev Agent
49ecb48eb0 feat(launcher): graceful SIGTERM->SIGKILL + configurable agent timeout (M-2)
The watchdog used to time.sleep(timeout) then immediately SIGKILL, which cut
claude off mid-write and left half-written artifacts. It now sends SIGTERM,
polls os.kill(pid, 0) for up to agent_kill_grace_seconds, and only SIGKILL if
the process is still alive; ProcessLookupError is tolerated at every step.

Timeout is now configurable via config.py: agent_timeout_seconds (default 1800),
agent_kill_grace_seconds (default 20), and agent_timeout_overrides_json for
per-agent overrides (e.g. {"reviewer": 3600}). AGENT_TIMEOUT is kept as a
backward-compatible alias. The recorded exit_code stays -9 so the ORCH-1
monitor retry/fail logic is unchanged (timeout-kills classify as permanent and
requeue within max_attempts, no retry loop).
2026-06-03 08:28:03 +03:00
Dev Agent
237732bc64 refactor(launcher): remove dead _auto_merge_pr (M-4)
_auto_merge_pr had zero callers (merge is handled by the deployer agent).
Removed the method; _ensure_pr (still used by the auto-advance path) is kept.
2026-06-03 08:27:52 +03:00
4e52e192e4 Merge pull request 'ORCH-1 (F-2b): persistent job queue instead of in-process daemon threads' (#3) from feature/ORCH-1-job-queue into main 2026-06-03 08:09:23 +03:00
Dev Agent
c23f000c05 fix(preflight): check the binary the launcher actually spawns (ORCH-1)
Container ORCH_CLAUDE_BIN pointed at a non-existent /usr/bin/claude while the
launcher spawns the hardcoded /opt/claude-code/bin/claude.exe. Preflight now
follows AgentLauncher.CLAUDE_BIN (the genuinely executed path), so it no longer
falsely blocks every job in production.
2026-06-03 00:13:44 +03:00
Dev Agent
d0d47058b4 docs(resilience): document preflight/429/backoff/breaker + env vars (ORCH-1) 2026-06-03 00:12:17 +03:00
Dev Agent
a613fd8180 test(resilience): 34 tests for preflight/classifier/backoff/breaker (ORCH-1)
Covers preflight FAIL->queued + cache, transient/permanent classifier +
Retry-After, exp backoff + available_at gating, launcher transient vs permanent
finalize, circuit breaker open/half-open/closed. test_queue worker tests stub
preflight OK. Popen never spawned.
2026-06-03 00:12:17 +03:00
Dev Agent
f314ae09e5 feat(worker): preflight gate + circuit breaker + /queue resilience (ORCH-1)
QueueWorker gates claims behind preflight and the CircuitBreaker (open ->
pause, no CLI calls + Telegram alert; half-open probes one job; closed on
recovery). Wires launcher.on_outcome. /queue exposes resilience snapshot.
2026-06-03 00:12:17 +03:00
Dev Agent
90fdd19394 feat(launcher): classify failures, backoff transient retry, breaker outcome (ORCH-1)
_finalize_job classifies the run log: transient (429/overload) -> backoff
requeue via mark_job_transient with separate transient_attempts budget honouring
Retry-After; permanent -> normal attempts<max. on_outcome callback feeds the
circuit breaker. _backoff_seconds = min(2^n*base, max) | Retry-After.
2026-06-03 00:12:17 +03:00
Dev Agent
4ef87a3959 feat(resilience): cheap preflight + 429/transient error classifier (ORCH-1)
preflight.py: cached CLAUDE_BIN exists + claude --version (no tokens, no
prompt-ping). error_classifier.py: classify_log_file -> transient|permanent
from log tail + Retry-After parsing.
2026-06-03 00:12:17 +03:00
Dev Agent
0cd9b11fe0 feat(queue): resilience schema + backoff helper + config (ORCH-1)
jobs.transient_attempts + available_at columns (idempotent _ensure_column
migration); claim_next_job honours available_at; mark_job_transient (backoff
requeue with separate transient budget). Config: preflight_cache_ttl,
backoff_base/max_seconds, transient_max_attempts, breaker_threshold,
breaker_pause_seconds.
2026-06-03 00:12:17 +03:00
Dev Agent
4be168c0ec docs(queue): document job queue, /queue, env vars (ORCH-1)
ARCHITECTURE job-queue section + flow diagram, README /queue endpoint and
ORCH_MAX_CONCURRENCY/ORCH_QUEUE_POLL_INTERVAL, new docs/ORCH-1_JOB_QUEUE.md.
2026-06-02 23:58:44 +03:00
Dev Agent
2283b8898b test(queue): 19 tests for job queue lifecycle/atomicity/retry/worker (ORCH-1)
Covers enqueue->claim->mark, atomic claim (no double dispatch, 8-thread race),
retry fail->queued->failed, requeue_running_jobs, observability, worker
max_concurrency. Popen fully mocked (no real agent spawned).
2026-06-02 23:58:44 +03:00
Dev Agent
b6d4426a48 feat(worker): background queue worker + lifespan + queue-recovery + /queue (ORCH-1)
queue_worker.QueueWorker drains the queue respecting max_concurrency. main.py
lifespan: queue-recovery (requeue running jobs) after M-1 orphan-recovery, starts
worker and stops it on shutdown. New GET /queue endpoint (counts + recent jobs).
2026-06-02 23:58:44 +03:00
Dev Agent
20d6556e22 refactor(webhooks): enqueue_job instead of in-process launch (ORCH-1)
All 8 webhook launch points (plane x4, gitea x4) now enqueue a job and return
immediately instead of synchronously spawning claude in the uvicorn process.
2026-06-02 23:58:44 +03:00
Dev Agent
3345c2fa0a feat(launcher): launch_job + job-status finalize with retries (ORCH-1)
Refactor launch() into shared _spawn(); add launch_job(job) that threads job_id
through monitor/watchdog. _finalize_job marks done / requeue (attempts<max) /
failed+notify. Internal advance-chain self.launch -> enqueue_job. B-1/B-2/M-1/ORCH-2
spawn logic unchanged.
2026-06-02 23:58:44 +03:00
Dev Agent
fd3dac7d22 feat(queue): add jobs table + queue helpers and config (ORCH-1)
Persistent SQLite job queue (F-2b): jobs table + idx, atomic claim_next_job,
enqueue/mark/count/requeue/get helpers. New settings max_concurrency
(ORCH_MAX_CONCURRENCY) and queue_poll_interval (ORCH_QUEUE_POLL_INTERVAL).
2026-06-02 23:58:44 +03:00
b021ff7cb0 Merge pull request 'ORCH-6: multi-repo (project filter + repo/prefix per project)' (#2) from feature/ORCH-6-multirepo into main 2026-06-02 23:42:29 +03:00
17 changed files with 2874 additions and 318 deletions

View File

@@ -39,6 +39,7 @@ created → analysis → architecture → development → review → testing →
|--------|------|----------|
| GET | `/health` | Health check |
| GET | `/status` | Активные задачи (stage != done) |
| GET | `/queue` | Очередь задач (ORCH-1): counts по статусам + max_concurrency + последние 10 jobs |
| POST | `/webhook/plane` | Plane webhook receiver |
| POST | `/webhook/gitea` | Gitea webhook receiver |
@@ -52,8 +53,9 @@ src/
├── stages.py # State machine (transitions, agents, QG)
├── notifications.py # Уведомления (логирование)
├── plane_sync.py # Синхронизация статусов с Plane API
├── queue_worker.py # ORCH-1: фоновый воркер очереди (claim → launch_job)
├── agents/
│ └── launcher.py # AgentLauncher: launch, monitor, watchdog, auto-advance
│ └── launcher.py # AgentLauncher: launch/launch_job, monitor, watchdog, auto-advance
├── webhooks/
│ ├── plane.py # Plane webhook handler
│ └── gitea.py # Gitea webhook handler (push, PR, CI status)
@@ -107,6 +109,36 @@ uvicorn src.main:app --reload --port 8500
| `ORCH_REPOS_DIR` | Repos dir (container) | `/repos` |
| `ORCH_HOST_REPOS_DIR` | Repos dir (host) | `/home/slin/repos` |
| `ORCH_DB_PATH` | SQLite path | `/app/data/orchestrator.db` |
| `ORCH_MAX_CONCURRENCY` | Сколько jobs воркер запускает параллельно (ORCH-1) | `1` |
| `ORCH_QUEUE_POLL_INTERVAL` | Период опроса очереди воркером, сек (ORCH-1) | `2.0` |
| `ORCH_PREFLIGHT_CACHE_TTL` | Кэш preflight (CLI/net), сек (ORCH-1 resilience) | `45` |
| `ORCH_BACKOFF_BASE_SECONDS` | База exp-backoff для transient (429) | `10` |
| `ORCH_BACKOFF_MAX_SECONDS` | Потолок backoff | `600` |
| `ORCH_TRANSIENT_MAX_ATTEMPTS` | Ретраи для 429/недоступности | `5` |
| `ORCH_BREAKER_THRESHOLD` | transient подряд до открытия breaker | `3` |
| `ORCH_BREAKER_PAUSE_SECONDS` | Пауза при открытом breaker | `300` |
## Очередь задач (ORCH-1 / F-2b)
Webhook-хэндлеры больше не спавнят claude-агентов синхронно в процессе uvicorn.
Вместо этого они кладут **job** в персистентную SQLite-таблицу `jobs`
(`enqueue_job`, мгновенный ответ), а фоновый воркер (`src/queue_worker.py`)
забирает jobs с учётом `ORCH_MAX_CONCURRENCY` и запускает агента (`launch_job`,
та же Popen-логика, что и раньше).
Преимущества:
- **Рестарт-safe.** При старте jobs со статусом `running` возвращаются в `queued`
(queue-recovery в lifespan) — работа не теряется.
- **Лимит параллелизма.** Воркер не превышает `ORCH_MAX_CONCURRENCY`.
- **Ретраи.** Упавший job (exit≠0) ретраится пока `attempts < max_attempts`,
потом `failed` + Telegram-нотификация.
Статусы job: `queued → running → done | failed`. Наблюдаемость — через `GET /queue`.
**Resilience-слой:** дешёвый preflight (CLI/net, кэш, без токенов) гейтит claim;
429/overload детектится по логу (transient vs permanent), transient ретраится с
exp-backoff (`available_at`, Retry-After); circuit breaker паузит воркер после N
transient подряд. Подробности: `docs/ORCH-1_JOB_QUEUE.md`.
## Multi-repo: реестр проектов (ORCH-6)

View File

@@ -264,9 +264,71 @@ services:
- ~~Shared `/repos` checkout (гонки при параллельных задачах).~~ **РЕШЕНО (ORCH-2 / S-4):**
git worktree per task/branch — см. раздел «Изоляция через git worktree» ниже.
- **In-process daemon-потоки.** Агенты запускаются в daemon-потоках uvicorn. При
рестарте uvicorn запущенные агенты осиротевают → ловит orphan-recovery (M-1).
Целевая архитектура — очередь задач (F-2b, отдельно).
- ~~In-process daemon-потоки (рестарт → сироты, потеря работы).~~ **РЕШЕНО (ORCH-1 / F-2b):**
персистентная очередь jobs + фоновый воркер — см. раздел «Очередь задач (ORCH-1)» ниже.
Daemon-потоки monitor/watchdog остаются для одного запущенного агента, но при
рестарте его job возвращается в `queued` (queue-recovery) и переподхватывается.
## Очередь задач (ORCH-1 / F-2b)
Раньше webhook-хэндлер **синхронно** спавнил `subprocess.Popen` + 2 daemon-thread
прямо в процессе uvicorn (8 точек вызова). Рестарт = сироты + потеря работы,
нет лимита параллелизма, нет ретраев.
### Flow
```
webhook (plane/gitea) background thread (queue_worker)
│ │
enqueue_job() ---> [ jobs table ] <--- claim_next_job() (atomic queued->running)
(мгновенный status=queued │
ответ 200) launch_job(job)
AgentLauncher._spawn (Popen claude)
_monitor_agent (proc.wait, commit/push,
│ advance stage)
_finalize_job:
exit 0 -> mark_job done
exit !=0 & attempts<max -> requeue (queued)
exit !=0 & attempts>=max -> failed + Telegram
```
### Таблица `jobs`
| Колонка | Назначение |
|--------|------------|
| `status` | `queued``running``done` \| `failed` |
| `attempts` / `max_attempts` | счётчик попыток (инкремент при claim) / лимит ретраев (default 2) |
| `run_id` | FK на `agent_runs.id` после старта |
| `task_content` | ТЗ, которое пишется в task-файл агента |
| `error` | последняя ошибка |
`idx_jobs_status (status, id)` — быстрый FIFO-выбор queued.
### Атомарный claim
`claim_next_job()` делает `SELECT queued ORDER BY id LIMIT 1``UPDATE ... WHERE id=? AND
status='queued'` и проверяет `rowcount`. При гонке двух тиков лишь один UPDATE
переведёт строку в `running` (rowcount==1); проигравший берёт следующий job.
### Queue-recovery (рестарт-safe)
В `main.py` lifespan **после** M-1 orphan-recovery вызывается `requeue_running_jobs()`:
jobs со статусом `running` (воркер умёр на рестарте) → возвращаются в `queued`.
Потом стартует воркер; на shutdown — `worker.stop()` (Event.set + join).
### Конфиг
- `ORCH_MAX_CONCURRENCY` (default 1) — лимит параллельных jobs.
- `ORCH_QUEUE_POLL_INTERVAL` (default 2.0) — период опроса.
Наблюдаемость: `GET /queue` — counts по статусам + последние 10 jobs.
> Совместимость: `launcher.launch()` (прямой синхронный запуск, `job_id=None`)
> сохранён для обратной совместимости. Очередь использует `launch_job()`;
> оба разделяют `_spawn()` (Popen-логика B-2 не изменена).
- **Gitea CI не настроен.** QG развития теперь локальный (`check_tests_local`);
Gitea CI-статусы не являются authoritative и не блокируют pipeline.
- **Docker внутри контейнера orchestrator НЕДОСТУПЕН.** Деплой идёт только через

127
docs/ORCH-1_JOB_QUEUE.md Normal file
View File

@@ -0,0 +1,127 @@
# ORCH-1 (F-2b): Persistent Job Queue
**Дата:** 2026-06-02
**Ветка:** `feature/ORCH-1-job-queue`
**Источник:** AUDIT_2026-06-02 (B-2 / F-2b)
## Проблема
Агенты запускались **in-process**: `launcher.launch()` синхронно спавнил
`subprocess.Popen` + 2 daemon-thread (`_watchdog`, `_monitor_agent`) прямо в
процессе uvicorn, из **8 webhook-точек**. Последствия:
- **Рестарт = катастрофа.** daemon-threads умирают, claude-процессы → сироты,
работа теряется (M-1 лишь помечал `exit=-1` и звал человека).
- **Нет лимита параллелизма** — N webhook'ов = N одновременных claude.
- **Нет ретраев** — упавший агент просто мёртв.
## Решение
Персистентная очередь задач (SQLite-таблица `jobs`) + фоновый воркер:
1. Webhook-хэндлер кладёт job (`enqueue_job`) → мгновенный ответ 200.
2. Фоновый воркер (`src/queue_worker.py`, отдельный daemon-thread) забирает
jobs с учётом `max_concurrency` (`claim_next_job`, атомарно) и спавнит агента
(`launcher.launch_job`, та же Popen-логика).
3. По завершении `_monitor_agent``_finalize_job`:
- `exit 0``done`;
- `exit != 0` & `attempts < max_attempts` → requeue (`queued`);
- `exit != 0` & `attempts >= max_attempts``failed` + Telegram.
## Что изменено
| Файл | Изменение |
|------|-----------|
| `src/db.py` | Таблица `jobs` + индекс; хелперы `enqueue_job`, `claim_next_job` (атомарный), `mark_job`, `count_running_jobs`, `requeue_running_jobs`, `get_job`, `job_status_counts`, `recent_jobs` |
| `src/config.py` | `max_concurrency` (env `ORCH_MAX_CONCURRENCY`, default 1), `queue_poll_interval` (env `ORCH_QUEUE_POLL_INTERVAL`, default 2.0) |
| `src/agents/launcher.py` | `launch()` → тонкая обёртка над `_spawn()`; новый `launch_job(job)`; `_spawn()` (общий, `job_id` опционально); monitor/watchdog принимают `job_id`; новый `_finalize_job()` (статусы + ретраи). 4 внутренних advance-вызова `self.launch``enqueue_job` |
| `src/webhooks/plane.py` | 4 точки `launcher.launch``enqueue_job` |
| `src/webhooks/gitea.py` | 4 точки `launcher.launch``enqueue_job` |
| `src/queue_worker.py` | **НОВЫЙ**`QueueWorker` (drain loop + max_concurrency + graceful stop) |
| `src/main.py` | lifespan: queue-recovery (`requeue_running_jobs`) после M-1, старт/останов воркера; новый `GET /queue` |
| `tests/test_queue.py` | **НОВЫЙ** — 19 тестов (lifecycle, атомарность claim, ретраи, requeue, observability, worker max_concurrency; Popen полностью замокан) |
## Атомарность claim
```sql
SELECT id FROM jobs WHERE status='queued' ORDER BY id LIMIT 1;
UPDATE jobs SET status='running', attempts=attempts+1, started_at=datetime('now')
WHERE id=? AND status='queued'; -- rowcount==1 => claimed, ==0 => проиграл гонку
```
Гарантия: один job не выдаётся дважды даже при параллельных тиках воркера
(проверено `test_concurrent_claims_no_duplicate` — 8 потоков, 20 jobs).
## Сохранённые фиксы (НЕ сломаны)
- **B-1** task-file write (direct `open()` в worktree) — без изменений.
- **B-2** Popen → log_fh (no PIPE), monitor reap — без изменений, только обёрнут.
- **M-1** orphan-recovery в `main.py` — оставлен, queue-recovery добавлен ПОСЛЕ него.
- **ORCH-2** worktree per task — без изменений.
- **ORCH-6** project registry/filter — без изменений.
## Acceptance
| # | Проверка | Статус |
|---|----------|--------|
| 1 | webhook кладёт job (queued) | ✅ enqueue_job |
| 2 | воркер исполняет queued→running→done | ✅ worker + _finalize_job |
| 3 | running ≤ max_concurrency | ✅ test_worker_respects_max_concurrency |
| 4 | ретрай fail→queued→failed+notify | ✅ test_finalize_job_requeue_then_fail |
| 5 | рестарт-safe (running→requeue) | ✅ requeue_running_jobs + lifespan |
| 6 | M-1 не сломан | ✅ оставлен в lifespan |
| 7 | тесты (new green, 9 pre-existing) | ✅ 76 passed / 9 pre-existing |
| 8 | `/queue` | ✅ counts + recent |
## Тесты
```bash
IMG=$(docker inspect orchestrator --format '{{.Config.Image}}')
docker run --rm -v /home/slin/repos/orchestrator:/code -w /code \
--entrypoint python3 $IMG -m pytest tests/ -q
# 110 passed, 9 failed (pre-existing test_webhooks 401/signature/TypeError)
```
---
## Resilience-слой (ДОПОЛНЕНИЕ: preflight + 429 + backoff + circuit breaker)
Надёжность очереди против недоступности CLI и rate-limit. Два РАЗНЫХ класса
проблем лечатся по-разному.
### A. Дешёвый preflight (`src/preflight.py`) — не жжёт токены
Перед claim воркер проверяет: `os.path.exists(CLAUDE_BIN)` + `claude --version`
(timeout 5с, токены НЕ тратит). Результат кэшируется `preflight_cache_ttl` (45с).
FAIL → воркер НЕ claimит (job остаётся `queued`), ждёт. 🚫 НЕТ prompt-ping.
### B. 429 — детект НА ВЫХОДЕ (`src/error_classifier.py`)
rate-limit нельзя предсказать — классифицируем по логу прогона. `classify_log_file`
читает хвост лога (16KB), ищет `429/rate limit/overloaded/quota/503/529/timeout/...`
`transient` или `permanent`. Извлекает `Retry-After`.
- **transient** (429/сеть) → backoff-ретрай с ОТДЕЛЬНЫМ `transient_attempts`
(лимит `transient_max_attempts=5`) — не жжёт code-fault бюджет.
- **permanent** (code-fault) → обычные `attempts < max_attempts` (2), потом `failed`.
### C. Backoff + `available_at`
Колонки `jobs.available_at TEXT` + `jobs.transient_attempts INTEGER` (миграция
`_ensure_column`). `claim_next_job`: `WHERE status='queued' AND (available_at IS NULL
OR available_at <= datetime('now'))`. При transient: `available_at = now +
min(2^n * base, max)` (base=10с, max=600с), `Retry-After` уважается (берёмся max).
### D. Circuit breaker (`CircuitBreaker` в queue_worker)
N=3 transient подряд → **open**: воркер паузит `breaker_pause_seconds=300`, ВООБЩЕ
не дёргает CLI, Telegram-алерт. Через паузу → **half-open** (пробует 1 job);
ожил (exit 0) → **closed**; снова transient → опять open. Состояние в памяти
воркера, отражается в `/queue.resilience`.
Связь launcher→breaker — через callback `launcher.on_outcome` (без import-цикла).
### Конфиг (config.py)
`preflight_cache_ttl=45`, `backoff_base_seconds=10`, `backoff_max_seconds=600`,
`transient_max_attempts=5`, `breaker_threshold=3`, `breaker_pause_seconds=300`.
### Тесты
`tests/test_resilience.py` — 34 теста: preflight (FAIL→queued, кэш, force),
классификатор (transient/permanent/Retry-After), backoff (рост/cap/Retry-After,
`available_at` гейтинг), launcher transient/permanent finalize, breaker
(open/half-open/closed/re-open, блок claim).

View File

@@ -1,10 +1,12 @@
import subprocess
import os
import json
import logging
import threading
import signal
import time
from ..config import settings
from ..db import get_db, get_task_by_repo_branch, update_task_stage
from ..db import get_db, get_task_by_repo_branch, update_task_stage, enqueue_job
from ..stages import get_next_stage, get_qg_for_stage, get_agent_for_stage
from ..git_worktree import ensure_worktree, get_worktree_path
from ..qg.checks import QG_CHECKS
@@ -53,11 +55,17 @@ class AgentLauncher:
}
CLAUDE_BIN = "/opt/claude-code/bin/claude.exe"
AGENT_TIMEOUT = 1800 # 30 minutes
# ORCH-7 (M-2): timeout is now configurable. AGENT_TIMEOUT stays as a
# backward-compatible alias for the default; the actual value (and per-agent
# overrides) live in settings and are resolved via _resolve_timeout().
AGENT_TIMEOUT = settings.agent_timeout_seconds
def launch(self, agent: str, repo: str, task_content: str = None, task_id: int = None) -> int:
"""
Launch a Claude CLI agent.
Launch a Claude CLI agent directly (legacy synchronous path).
Kept for backward compatibility (direct callers / existing tests). The
ORCH-1 job queue uses launch_job() instead, but both share _spawn().
Args:
agent: Agent role (analyst, architect, developer, reviewer, tester)
@@ -68,6 +76,31 @@ class AgentLauncher:
Returns:
agent_run_id from DB
"""
return self._spawn(agent, repo, task_content, task_id, job_id=None)
def launch_job(self, job: dict) -> int:
"""ORCH-1: launch an agent for a claimed queue job.
Same spawn path as launch(), but threads job['id'] through so the monitor
can update the job's status (done / requeue / failed) and link jobs.run_id
to the agent_runs row. Returns the agent_run_id.
"""
return self._spawn(
job["agent"],
job["repo"],
job.get("task_content"),
job.get("task_id"),
job_id=job["id"],
)
def _spawn(self, agent: str, repo: str, task_content: str = None,
task_id: int = None, job_id: int = None) -> int:
"""Shared spawn implementation for launch() and launch_job().
When job_id is set, the monitor/watchdog drive the jobs table status
(ORCH-1). The claude-CLI Popen logic (B-2) and worktree/task-file logic
(B-1 / ORCH-2) are unchanged.
"""
config = self.AGENT_CONFIGS.get(agent)
if not config:
raise ValueError(f"Unknown agent: {agent}")
@@ -98,6 +131,14 @@ class AgentLauncher:
run_id = cursor.lastrowid
conn.commit()
# ORCH-1: link this job to the agent_runs row and stamp started_at.
if job_id is not None:
conn.execute(
"UPDATE jobs SET run_id = ?, started_at = datetime('now') WHERE id = ?",
(run_id, job_id),
)
conn.commit()
# Prepare output log path
output_path = f"/app/data/runs/{run_id}.log"
os.makedirs(os.path.dirname(output_path), exist_ok=True)
@@ -154,6 +195,7 @@ class AgentLauncher:
t = threading.Thread(
target=self._watchdog,
args=(proc.pid, run_id),
kwargs={"job_id": job_id, "agent": agent},
daemon=True,
)
t.start()
@@ -163,6 +205,7 @@ class AgentLauncher:
m = threading.Thread(
target=self._monitor_agent,
args=(proc, run_id, agent, repo, agent_branch, output_path, log_fh),
kwargs={"job_id": job_id},
daemon=True,
)
m.start()
@@ -171,26 +214,102 @@ class AgentLauncher:
notify_agent_started(run_id, agent, task_id)
return run_id
def _watchdog(self, pid: int, run_id: int, timeout: int = None):
"""Kill agent if it exceeds timeout."""
import time
@staticmethod
def _resolve_timeout(agent: str = None) -> int:
"""ORCH-7 (M-2): resolve the wall-clock timeout for an agent.
Per-agent override from settings.agent_timeout_overrides_json (a JSON object
like {"reviewer": 3600}) wins; otherwise the global default
settings.agent_timeout_seconds is used. A malformed override JSON is ignored
(falls back to the default) and only logged, so a bad env never bricks runs.
"""
default = settings.agent_timeout_seconds
raw = (settings.agent_timeout_overrides_json or "").strip()
if agent and raw:
try:
overrides = json.loads(raw)
if isinstance(overrides, dict) and agent in overrides:
return int(overrides[agent])
except (ValueError, TypeError) as e:
logger.warning(f"Invalid agent_timeout_overrides_json, using default: {e}")
return default
def _watchdog(self, pid: int, run_id: int, timeout: int = None,
job_id: int = None, agent: str = None):
"""Kill agent if it exceeds its timeout.
ORCH-1: on a timeout-kill the monitor's proc.wait() returns the kill exit
code and drives the job retry/fail logic, so the watchdog itself only needs
to terminate the process and record the agent_runs exit. job_id is accepted
for symmetry.
ORCH-7 (M-2): graceful shutdown. Instead of an immediate SIGKILL (which cuts
claude off mid-write and leaves half-written artifacts), send SIGTERM first,
give the process up to settings.agent_kill_grace_seconds to flush and exit on
its own, and only SIGKILL if it is still alive after the grace window. If the
process exits during the grace window, SIGKILL is NOT sent.
ProcessLookupError is tolerated at every step (the process may already be
gone). The recorded exit_code stays -9 to match the existing retry/fail
contract regardless of which signal actually reaped it.
"""
if timeout is None:
timeout = self.AGENT_TIMEOUT
timeout = self._resolve_timeout(agent)
time.sleep(timeout)
# Phase 1: SIGTERM (graceful). If the process is already gone, we're done.
try:
os.kill(pid, signal.SIGTERM)
logger.warning(
f"Agent run_id={run_id} exceeded {timeout}s timeout: sent SIGTERM "
f"(pid={pid}), grace={settings.agent_kill_grace_seconds}s"
)
except ProcessLookupError:
logger.info(f"Agent run_id={run_id} already exited before SIGTERM")
return # nothing to record: the monitor's proc.wait() owns the exit
# Phase 2: poll for graceful exit within the grace window.
grace = settings.agent_kill_grace_seconds
poll_interval = 0.5
waited = 0.0
while waited < grace:
time.sleep(poll_interval)
waited += poll_interval
try:
os.kill(pid, 0) # signal 0 = liveness probe, does not kill
except ProcessLookupError:
logger.info(
f"Agent run_id={run_id} exited gracefully after SIGTERM "
f"({waited:.1f}s); no SIGKILL needed"
)
self._record_kill(run_id)
return
# Phase 3: still alive -> hard SIGKILL.
try:
os.kill(pid, signal.SIGKILL)
logger.warning(f"Agent run_id={run_id} killed after {timeout}s timeout")
conn = get_db()
conn.execute(
"UPDATE agent_runs SET finished_at=datetime('now'), exit_code=-9 WHERE id=?",
(run_id,),
logger.warning(
f"Agent run_id={run_id} did not exit within {grace}s grace: sent SIGKILL"
)
conn.commit()
conn.close()
except ProcessLookupError:
pass # Already finished
logger.info(f"Agent run_id={run_id} exited just before SIGKILL")
self._record_kill(run_id)
def _monitor_agent(self, proc, run_id, agent, repo, branch, output_path=None, log_fh=None):
@staticmethod
def _record_kill(run_id: int):
"""Stamp the agent_runs row as timeout-killed (exit_code=-9).
ORCH-1: -9 is the existing kill-exit contract the monitor/retry logic keys
off, so we keep it stable whether the reap came from SIGTERM or SIGKILL.
"""
conn = get_db()
conn.execute(
"UPDATE agent_runs SET finished_at=datetime('now'), exit_code=-9 WHERE id=?",
(run_id,),
)
conn.commit()
conn.close()
def _monitor_agent(self, proc, run_id, agent, repo, branch, output_path=None, log_fh=None, job_id=None):
"""Wait for agent to finish, commit+push results, update DB.
B-2 fix: stdout already goes straight to the log file via Popen, so we just
@@ -318,8 +437,142 @@ class AgentLauncher:
if exit_code == 0:
self._try_advance_stage(run_id, agent, repo, branch)
# ORCH-1: drive the job-queue status for queue-launched jobs only.
# (Legacy direct launch() has job_id=None and is unaffected.)
if job_id is not None:
self._finalize_job(job_id, agent, run_id, exit_code, output_path=output_path)
def _backoff_seconds(self, transient_attempts: int, retry_after: int = None) -> int:
"""Exponential backoff for transient failures, honouring Retry-After.
backoff = min(2^transient_attempts * base, max). If the server sent a
Retry-After, use the larger of the two (never poll sooner than asked).
"""
base = settings.backoff_base_seconds
cap = settings.backoff_max_seconds
backoff = min((2 ** max(transient_attempts, 0)) * base, cap)
if retry_after is not None and retry_after > 0:
backoff = max(backoff, min(retry_after, cap))
return int(backoff)
def _finalize_job(self, job_id: int, agent: str, run_id: int, exit_code, output_path=None):
"""ORCH-1: update the jobs row after the agent process finished.
exit_code == 0 -> done (and resets the breaker streak via on_outcome).
exit_code != 0 -> classify the failure from the run log tail (token-free):
- TRANSIENT (429/overload/network): backoff-requeue with available_at in
the future + a SEPARATE transient_attempts budget
(settings.transient_max_attempts), honouring Retry-After. Reported to
the breaker so it opens after N consecutive transient failures.
- PERMANENT (code fault): ordinary attempts < max_attempts requeue,
otherwise 'failed' + Telegram.
"""
from ..db import get_job, mark_job
from ..error_classifier import classify_log_file
try:
job = get_job(job_id)
if not job:
return
if exit_code == 0:
mark_job(job_id, "done", run_id=run_id)
logger.info(f"Job {job_id} ({agent}) done (run_id={run_id})")
self._record_outcome(transient=False, recovered=True)
return
# Classify the failure from the agent log tail (no token cost).
kind, retry_after = "permanent", None
log_path = output_path or f"/app/data/runs/{run_id}.log"
try:
kind, retry_after = classify_log_file(log_path)
except Exception:
pass
if kind == "transient":
self._finalize_transient(job_id, agent, run_id, exit_code, job, retry_after)
else:
self._finalize_permanent(job_id, agent, run_id, exit_code, job)
except Exception as e:
logger.error(f"Job {job_id}: _finalize_job error: {e}")
def _finalize_transient(self, job_id, agent, run_id, exit_code, job, retry_after):
"""Transient (429/overload/net) failure -> backoff requeue or fail when budget out."""
from ..db import mark_job, mark_job_transient
tattempts = job.get("transient_attempts", 0)
tmax = settings.transient_max_attempts
err = (f"transient (429/overload) agent {agent} exit={exit_code} "
f"(run_id={run_id}); retry_after={retry_after}")
self._record_outcome(transient=True, recovered=False)
if tattempts < tmax:
backoff = self._backoff_seconds(tattempts + 1, retry_after)
mark_job_transient(job_id, backoff, error=err)
logger.warning(
f"Job {job_id} ({agent}) TRANSIENT fail (exit={exit_code}), "
f"backoff {backoff}s, transient_attempt {tattempts + 1}/{tmax}"
)
else:
mark_job(job_id, "failed", run_id=run_id, error=err)
logger.error(
f"Job {job_id} ({agent}) failed after {tattempts} transient attempts"
)
self._notify_failed(job_id, agent, job, run_id,
f"transient (rate-limit) after {tattempts} attempts")
def _finalize_permanent(self, job_id, agent, run_id, exit_code, job):
"""Permanent (code-fault) failure -> normal attempts<max requeue, then fail."""
from ..db import mark_job
attempts = job.get("attempts", 0)
max_attempts = job.get("max_attempts", 2)
err = f"agent {agent} exit_code={exit_code} (run_id={run_id})"
self._record_outcome(transient=False, recovered=False)
if attempts < max_attempts:
mark_job(job_id, "queued", run_id=run_id, error=err)
logger.warning(
f"Job {job_id} ({agent}) failed (exit={exit_code}), "
f"requeued (attempt {attempts}/{max_attempts})"
)
else:
mark_job(job_id, "failed", run_id=run_id, error=err)
logger.error(
f"Job {job_id} ({agent}) failed permanently after "
f"{attempts} attempts (exit={exit_code})"
)
self._notify_failed(job_id, agent, job, run_id,
f"{attempts} attempts (exit={exit_code})")
def _notify_failed(self, job_id, agent, job, run_id, why):
try:
from ..notifications import send_telegram
send_telegram(
f"\U0001f6a8 Job {job_id} ({agent}, repo {job.get('repo')}) "
f"failed: {why}. Logs: /app/data/runs/{run_id}.log"
)
except Exception:
pass
def _record_outcome(self, transient: bool, recovered: bool):
"""Forward the run outcome to the circuit breaker (if a worker is wired).
Decoupled via a settable callback (set by QueueWorker.start) so the launcher
does not hard-import the worker (avoids a cycle) and tests can run the
launcher standalone.
"""
cb = getattr(self, "on_outcome", None)
if cb:
try:
cb(transient=transient, recovered=recovered)
except Exception:
pass
def _try_advance_stage(self, run_id: int, agent: str, repo: str, branch: str):
"""After agent finishes successfully, check QG and advance stage if possible."""
"""After agent finishes successfully, advance the stage via the unified engine.
ORCH-4 / M-3: the 174-line body that used to live here moved into
src/stage_engine.advance_stage(). This is now a thin wrapper: it looks up
the task by (repo, branch) and delegates. `agent` is forwarded as
finished_agent so the analyst/reviewer/tester/architect rollback branches
still trigger exactly as before. The agent-selection bug (it used to call
get_agent_for_stage(next_stage)) is fixed inside the engine.
"""
try:
conn = get_db()
task_row = conn.execute(
@@ -331,174 +584,15 @@ class AgentLauncher:
return
task_id, current_stage, work_item_id = task_row
qg_name = get_qg_for_stage(current_stage)
next_stage = get_next_stage(current_stage)
if not next_stage:
return
# Run QG check if defined
if qg_name and qg_name in QG_CHECKS:
check_fn = QG_CHECKS[qg_name]
if qg_name in ("check_analysis_approved",):
# Requires human approval - post request comment if analyst just finished
if agent == "analyst" and qg_name == "check_analysis_approved" and work_item_id:
files_check = QG_CHECKS.get("check_analysis_complete")
if files_check:
files_ok, _ = files_check(repo, work_item_id, branch)
if files_ok:
# Full artifacts ready -> In Review
from ..plane_sync import set_issue_in_review
set_issue_in_review(work_item_id)
plane_add_comment(
work_item_id,
"\U0001f4cb BRD/\u0422\u0417/AC/TestPlan \u0433\u043e\u0442\u043e\u0432\u044b. "
"\u041f\u0440\u043e\u0448\u0443 review \u0438 \u0440\u0435\u0430\u043a\u0446\u0438\u044e :approved: \u0434\u043b\u044f \u043f\u0440\u043e\u0434\u0432\u0438\u0436\u0435\u043d\u0438\u044f \u0432 Architecture."
)
notify_approve_requested(task_id)
logger.info(f"Task {task_id}: analyst finished, requested :approved: in Plane")
else:
# Check if questions file exists (in the task worktree)
import os as _os
questions_path = _os.path.join(
get_worktree_path(repo, branch),
f"docs/work-items/{work_item_id}/01-questions.md"
)
if _os.path.isfile(questions_path):
# Analyst has questions -> Needs Input
from ..plane_sync import set_issue_needs_input
set_issue_needs_input(work_item_id)
with open(questions_path, "r") as qf:
questions_text = qf.read()
plane_add_comment(
work_item_id,
f"\u2753 Analyst \u043d\u0443\u0436\u0434\u0430\u0435\u0442\u0441\u044f \u0432 \u0443\u0442\u043e\u0447\u043d\u0435\u043d\u0438\u0438:\n\n{questions_text}"
)
from ..notifications import send_telegram
send_telegram(
f"\u2753 {work_item_id}: Analyst \u0437\u0430\u0434\u0430\u0451\u0442 \u0432\u043e\u043f\u0440\u043e\u0441\u044b. \u041e\u0442\u0432\u0435\u0442\u044c \u0432 Plane."
)
else:
# No artifacts and no questions
plane_add_comment(
work_item_id,
"\u26a0\ufe0f Analyst \u0437\u0430\u0432\u0435\u0440\u0448\u0438\u043b\u0441\u044f \u0431\u0435\u0437 \u0430\u0440\u0442\u0435\u0444\u0430\u043a\u0442\u043e\u0432 \u0438 \u0431\u0435\u0437 \u0432\u043e\u043f\u0440\u043e\u0441\u043e\u0432. \u041f\u0440\u043e\u0432\u0435\u0440\u044c\u0442\u0435 \u043b\u043e\u0433."
)
return
elif qg_name in ("check_ci_green", "check_tests_local"):
# (repo, branch) signature — already worktree-aware.
passed, reason = check_fn(repo, branch)
elif qg_name == "check_tests_passed":
# Artifact check — pass branch so it reads from the worktree.
passed, reason = check_fn(repo, work_item_id or "", branch)
else:
# Other artifact checks (check_architecture_done, etc.) — worktree-aware.
passed, reason = check_fn(repo, work_item_id or "", branch)
if not passed:
logger.info(f"Task {task_id}: QG '{qg_name}' not passed after {agent}: {reason}")
# If reviewer says REQUEST_CHANGES, rollback to development
if agent == "reviewer" and "REQUEST_CHANGES" in reason:
update_task_stage(task_id, "development")
notify_stage_change(task_id, current_stage, "development")
plane_notify_stage(work_item_id, current_stage, "development")
# Count retries
conn2 = get_db()
retry_count = conn2.execute(
"SELECT COUNT(*) FROM agent_runs WHERE task_id=? AND agent='developer'",
(task_id,)
).fetchone()[0]
conn2.close()
if retry_count < 3:
task_desc = (
f"Work item: {work_item_id}\nRepo: {repo}\nBranch: {branch}\n"
f"Stage: development\nNote: REQUEST_CHANGES from reviewer "
f"(attempt {retry_count+1}/3). Fix findings in "
f"docs/work-items/{work_item_id}/12-review.md"
)
new_run = self.launch("developer", repo, task_desc, task_id=task_id)
logger.info(f"Task {task_id}: reviewer REQUEST_CHANGES, relaunched developer (run_id={new_run})")
else:
from ..notifications import send_telegram
send_telegram(f"\u26a0\ufe0f {work_item_id}: Max developer retries (3) reached. Manual intervention needed.")
logger.error(f"Task {task_id}: max retries reached")
# Task 6: Tester FAIL -> rollback to development
if agent == "tester" and qg_name == "check_tests_passed" and not passed:
update_task_stage(task_id, "development")
notify_stage_change(task_id, current_stage, "development")
plane_notify_stage(work_item_id, current_stage, "development")
from ..plane_sync import set_issue_in_progress
set_issue_in_progress(work_item_id)
plane_add_comment(
work_item_id,
f"\u274c \u0422\u0435\u0441\u0442\u044b \u043d\u0435 \u043f\u0440\u043e\u0448\u043b\u0438: {reason}. Developer \u043f\u0435\u0440\u0435\u0437\u0430\u043f\u0443\u0449\u0435\u043d \u0434\u043b\u044f \u0444\u0438\u043a\u0441\u0430."
)
conn2 = get_db()
retry_count = conn2.execute(
"SELECT COUNT(*) FROM agent_runs WHERE task_id=? AND agent='developer'",
(task_id,)
).fetchone()[0]
conn2.close()
if retry_count < 3:
task_desc = (
f"Work item: {work_item_id}\nRepo: {repo}\nBranch: {branch}\n"
f"Stage: development\nNote: Tests FAILED. "
f"Fix failures described in docs/work-items/{work_item_id}/13-test-report.md"
)
new_run = self.launch("developer", repo, task_desc, task_id=task_id)
logger.info(f"Task {task_id}: tester FAIL, relaunched developer (run_id={new_run})")
else:
from ..notifications import send_telegram
from ..plane_sync import set_issue_blocked
set_issue_blocked(work_item_id)
send_telegram(f"\U0001f6a8 {work_item_id}: Tests still failing after 3 developer retries. Manual intervention needed.")
# Task 8: Architect conflict -> rollback to analysis
if agent == "architect" and qg_name == "check_architecture_done" and not passed:
import os as _os
conflict_path = _os.path.join(
get_worktree_path(repo, branch),
f"docs/work-items/{work_item_id}/10-conflict.md"
)
if _os.path.isfile(conflict_path):
update_task_stage(task_id, "analysis")
notify_stage_change(task_id, current_stage, "analysis")
plane_notify_stage(work_item_id, current_stage, "analysis")
from ..plane_sync import set_issue_in_progress
set_issue_in_progress(work_item_id)
with open(conflict_path, "r") as cf:
conflict_text = cf.read()[:500]
plane_add_comment(
work_item_id,
f"\u26a0\ufe0f Architect \u043d\u0430\u0448\u0451\u043b \u043a\u043e\u043d\u0444\u043b\u0438\u043a\u0442 \u0441 \u0422\u0417. \u0412\u043e\u0437\u0432\u0440\u0430\u0442 \u0432 Analysis.\n\n{conflict_text}"
)
task_desc = (
f"Work item: {work_item_id}\nRepo: {repo}\nBranch: {branch}\n"
f"Stage: analysis\nNote: Architect conflict. Revise TRZ. "
f"See docs/work-items/{work_item_id}/10-conflict.md"
)
new_run = self.launch("analyst", repo, task_desc, task_id=task_id)
logger.info(f"Task {task_id}: architect conflict, relaunched analyst")
return
return
elif qg_name:
return
# Advance stage
update_task_stage(task_id, next_stage)
notify_stage_change(task_id, current_stage, next_stage)
plane_notify_stage(work_item_id, current_stage, next_stage)
logger.info(f"Task {task_id}: {current_stage} -> {next_stage} (auto-advance after {agent})")
# Launch next agent if defined
next_agent = get_agent_for_stage(next_stage)
if next_agent:
task_desc = f"Work item: {work_item_id}\nRepo: {repo}\nBranch: {branch}\nStage: {next_stage}"
new_run_id = self.launch(next_agent, repo, task_desc, task_id=task_id)
logger.info(f"Task {task_id}: launched '{next_agent}' (run_id={new_run_id})")
from ..stage_engine import advance_stage
advance_stage(
task_id=task_id,
current_stage=current_stage,
repo=repo,
work_item_id=work_item_id,
branch=branch,
finished_agent=agent,
)
except Exception as e:
logger.error(f"Auto-advance failed for run_id={run_id}: {e}")
@@ -534,47 +628,6 @@ class AgentLauncher:
logger.error(f"Failed to create PR for {branch}: {e}")
return None
def _auto_merge_pr(self, repo: str, branch: str, task_id: int, work_item_id: str):
import httpx
owner = settings.gitea_owner
headers = {"Authorization": f"token {settings.gitea_token}"}
base_url = f"{settings.gitea_url}/api/v1"
try:
resp = httpx.get(
f"{base_url}/repos/{owner}/{repo}/pulls",
params={"state": "open", "head": branch},
headers=headers, timeout=10
)
resp.raise_for_status()
prs = resp.json()
if not prs:
pr_number = self._ensure_pr(repo, branch, 0)
if not pr_number:
return False
else:
pr_number = prs[0]["number"]
resp = httpx.post(
f"{base_url}/repos/{owner}/{repo}/pulls/{pr_number}/merge",
json={"Do": "merge"},
headers=headers, timeout=30
)
if resp.status_code in (200, 204):
logger.info(f"PR #{pr_number} merged for {branch}")
update_task_stage(task_id, "done")
notify_stage_change(task_id, "deploy", "done")
plane_notify_stage(work_item_id, "deploy", "done")
from ..notifications import send_telegram
send_telegram(f"\u2705 {work_item_id}: PR #{pr_number} merged! deploy -> done. Task complete.")
return True
else:
logger.error(f"Merge failed for PR #{pr_number}: {resp.status_code} {resp.text}")
from ..notifications import send_telegram
send_telegram(f"\u26a0\ufe0f {work_item_id}: Auto-merge failed (HTTP {resp.status_code}). Manual merge needed.")
return False
except Exception as e:
logger.error(f"Auto-merge failed for {branch}: {e}")
return False
def _write_task_file(self, repo: str, branch: str, task_file: str, content: str):
"""Write task file directly into the task's worktree.

View File

@@ -30,6 +30,42 @@ class Settings(BaseSettings):
# DB
db_path: str = "/app/data/orchestrator.db"
# ORCH-1 (F-2b): persistent job queue / background worker.
# max_concurrency -> max agent jobs running in parallel (env ORCH_MAX_CONCURRENCY)
# queue_poll_interval -> worker loop poll seconds (env ORCH_QUEUE_POLL_INTERVAL)
max_concurrency: int = 1
queue_poll_interval: float = 2.0
# ORCH-1b (resilience): preflight + 429/rate-limit + backoff + circuit breaker.
# preflight_cache_ttl -> cache the cheap CLI/network preflight result (seconds);
# the worker does NOT re-run `claude --version` more often
# than this (env ORCH_PREFLIGHT_CACHE_TTL).
# backoff_base_seconds -> base for exponential transient backoff.
# backoff_max_seconds -> ceiling for the transient backoff.
# transient_max_attempts -> retry budget for transient (429/overload/network)
# failures, separate from code-fault `attempts`.
# breaker_threshold -> consecutive transient failures that OPEN the breaker.
# breaker_pause_seconds -> how long the breaker stays open before half-open.
preflight_cache_ttl: int = 45
backoff_base_seconds: int = 10
backoff_max_seconds: int = 600
transient_max_attempts: int = 5
breaker_threshold: int = 3
breaker_pause_seconds: int = 300
# ORCH-7 (M-2): agent timeout + graceful kill.
# agent_timeout_seconds -> default per-agent wall-clock budget; the watchdog
# kills the run after this (env ORCH_AGENT_TIMEOUT_SECONDS).
# agent_kill_grace_seconds-> pause between SIGTERM and SIGKILL so claude can
# flush artifacts before the hard kill
# (env ORCH_AGENT_KILL_GRACE_SECONDS).
# agent_timeout_overrides_json -> optional per-agent override JSON object,
# e.g. {"reviewer": 3600, "architect": 2700}
# (env ORCH_AGENT_TIMEOUT_OVERRIDES_JSON).
agent_timeout_seconds: int = 1800
agent_kill_grace_seconds: int = 20
agent_timeout_overrides_json: str = ""
# Telegram notifications
telegram_bot_token: str = ""

266
src/db.py
View File

@@ -40,10 +40,44 @@ def init_db():
exit_code INTEGER,
output_path TEXT
);
-- ORCH-1 (F-2b): persistent job queue. Webhook handlers enqueue a job and
-- return immediately; a background worker claims jobs (respecting
-- max_concurrency), spawns the claude agent, and updates the status.
-- Restart-safe: running jobs are requeued on startup (queue-recovery).
CREATE TABLE IF NOT EXISTS jobs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
agent TEXT NOT NULL,
repo TEXT NOT NULL,
task_id INTEGER, -- FK tasks.id (nullable)
task_content TEXT, -- written to the agent task_file
status TEXT NOT NULL DEFAULT 'queued', -- queued|running|done|failed
attempts INTEGER NOT NULL DEFAULT 0,
max_attempts INTEGER NOT NULL DEFAULT 2,
run_id INTEGER, -- agent_runs.id once started
error TEXT, -- last error message
transient_attempts INTEGER NOT NULL DEFAULT 0, -- ORCH-1 resilience: 429/transient retries
available_at TEXT, -- ORCH-1 resilience: backoff gate (claim when <= now)
created_at TEXT DEFAULT (datetime('now')),
started_at TEXT,
finished_at TEXT
);
CREATE INDEX IF NOT EXISTS idx_jobs_status ON jobs(status, id);
""")
# Lightweight migration: add resilience columns to a pre-existing jobs table
# (CREATE TABLE IF NOT EXISTS won't add columns to an already-created table).
_ensure_column(conn, "jobs", "transient_attempts", "INTEGER NOT NULL DEFAULT 0")
_ensure_column(conn, "jobs", "available_at", "TEXT")
conn.close()
def _ensure_column(conn, table: str, column: str, decl: str):
"""Add a column to `table` if it does not already exist (idempotent migration)."""
cols = [r[1] for r in conn.execute(f"PRAGMA table_info({table})").fetchall()]
if column not in cols:
conn.execute(f"ALTER TABLE {table} ADD COLUMN {column} {decl}")
conn.commit()
def get_task_by_plane_id(plane_id: str) -> dict | None:
"""Find task by Plane work item ID (checks plane_id and plane_issue_id)."""
conn = get_db()
@@ -105,3 +139,235 @@ def get_next_work_item_id(repo: str, prefix: str = "ET") -> str:
next_num = 1
return f"{prefix}-{next_num:03d}"
# ---------------------------------------------------------------------------
# ORCH-1 (F-2b): job queue helpers
# ---------------------------------------------------------------------------
def enqueue_job(
agent: str,
repo: str,
task_content: str | None = None,
task_id: int | None = None,
max_attempts: int = 2,
) -> int:
"""Enqueue a new job (status='queued'). Returns the new job id.
This is what webhook handlers call instead of launching an agent in-process:
it is a fast DB INSERT that returns immediately. The background worker
(queue_worker) picks the job up later.
"""
conn = get_db()
cursor = conn.execute(
"INSERT INTO jobs (agent, repo, task_id, task_content, max_attempts) "
"VALUES (?, ?, ?, ?, ?)",
(agent, repo, task_id, task_content, max_attempts),
)
job_id = cursor.lastrowid
conn.commit()
conn.close()
return job_id
def claim_next_job() -> dict | None:
"""Atomically claim the oldest queued job and mark it 'running'.
Atomicity: the UPDATE carries the `status='queued'` guard in its WHERE clause
and we check `rowcount`. If two worker ticks race for the same row, only the
first UPDATE flips it to 'running' (rowcount==1); the loser sees rowcount==0
and retries the SELECT. We rely on SQLite's default per-connection transaction
so the SELECT+UPDATE pair is consistent. Returns the claimed job dict or None
when the queue is empty.
"""
conn = get_db()
try:
while True:
row = conn.execute(
"SELECT id FROM jobs WHERE status='queued' "
"AND (available_at IS NULL OR available_at <= datetime('now')) "
"ORDER BY id LIMIT 1"
).fetchone()
if not row:
return None
job_id = row["id"]
cur = conn.execute(
"UPDATE jobs SET status='running', "
"attempts = attempts + 1, started_at = datetime('now') "
"WHERE id = ? AND status='queued'",
(job_id,),
)
conn.commit()
if cur.rowcount == 1:
claimed = conn.execute(
"SELECT * FROM jobs WHERE id = ?", (job_id,)
).fetchone()
return dict(claimed)
# Lost the race for this row; loop and try the next queued job.
finally:
conn.close()
def mark_job_transient(job_id: int, available_at_sql_offset_seconds: int,
error: str | None = None) -> None:
"""ORCH-1 resilience: requeue a job after a *transient* failure (429/overload/net).
Increments `transient_attempts` (separate from the code-fault `attempts`),
sets status back to 'queued', and gates re-pickup via `available_at` =
now + backoff seconds. started_at/finished_at are cleared.
"""
conn = get_db()
sets = [
"status='queued'",
"transient_attempts = transient_attempts + 1",
"available_at = datetime('now', ?)",
"started_at = NULL",
"finished_at = NULL",
]
params: list = [f"+{int(available_at_sql_offset_seconds)} seconds"]
if error is not None:
sets.append("error = ?")
params.append(error)
params.append(job_id)
conn.execute(f"UPDATE jobs SET {', '.join(sets)} WHERE id = ?", params)
conn.commit()
conn.close()
def mark_job(
job_id: int,
status: str,
run_id: int | None = None,
error: str | None = None,
):
"""Update a job's status (queued|running|done|failed).
- run_id (optional): link to the agent_runs row that executed this job.
- error (optional): last error message (for failed/retry).
- 'done'/'failed' also stamp finished_at.
- 'queued' (requeue for retry) clears started_at/finished_at so the next
claim treats it as fresh.
"""
conn = get_db()
sets = ["status = ?"]
params: list = [status]
if run_id is not None:
sets.append("run_id = ?")
params.append(run_id)
if error is not None:
sets.append("error = ?")
params.append(error)
if status in ("done", "failed"):
sets.append("finished_at = datetime('now')")
elif status == "queued":
sets.append("started_at = NULL")
sets.append("finished_at = NULL")
params.append(job_id)
conn.execute(f"UPDATE jobs SET {', '.join(sets)} WHERE id = ?", params)
conn.commit()
conn.close()
def count_running_jobs() -> int:
"""Number of jobs currently in 'running' status (for max_concurrency)."""
conn = get_db()
n = conn.execute(
"SELECT COUNT(*) FROM jobs WHERE status='running'"
).fetchone()[0]
conn.close()
return int(n)
def requeue_running_jobs() -> int:
"""Queue-recovery: on startup, any job left 'running' belongs to a worker that
died on restart -> put it back to 'queued'. attempts are kept as-is (the next
claim does NOT re-increment beyond what is needed; claim_next_job increments on
pickup). Returns the number of requeued jobs.
"""
conn = get_db()
cur = conn.execute(
"UPDATE jobs SET status='queued', started_at = NULL "
"WHERE status='running'"
)
conn.commit()
n = cur.rowcount
conn.close()
return int(n)
def get_job(job_id: int) -> dict | None:
"""Fetch a single job by id."""
conn = get_db()
row = conn.execute("SELECT * FROM jobs WHERE id = ?", (job_id,)).fetchone()
conn.close()
return dict(row) if row else None
def job_status_counts() -> dict:
"""Return counts grouped by status (for /queue observability)."""
conn = get_db()
rows = conn.execute(
"SELECT status, COUNT(*) AS n FROM jobs GROUP BY status"
).fetchall()
conn.close()
counts = {"queued": 0, "running": 0, "done": 0, "failed": 0}
for r in rows:
counts[r["status"]] = r["n"]
return counts
def recent_jobs(limit: int = 10) -> list[dict]:
"""Return the most recent jobs (for /queue observability)."""
conn = get_db()
rows = conn.execute(
"SELECT * FROM jobs ORDER BY id DESC LIMIT ?", (limit,)
).fetchall()
conn.close()
return [dict(r) for r in rows]
# ---------------------------------------------------------------------------
# ORCH-1b (resilience): transient backoff helpers
# ---------------------------------------------------------------------------
def requeue_job_transient(job_id: int, delay_seconds: float, error: str | None = None):
"""ORCH-1b: requeue a job after a TRANSIENT (429/overload/network) failure.
Unlike a code-fault requeue, this:
- increments `transient_attempts` (a separate budget from code-fault attempts)
- sets `available_at = now + delay_seconds` so claim_next_job won't pick it
up until the backoff window elapses
- sets status back to 'queued' and clears started_at/finished_at
delay_seconds is computed by the caller (exp backoff, capped, Retry-After).
"""
conn = get_db()
conn.execute(
"UPDATE jobs SET status='queued', "
"transient_attempts = transient_attempts + 1, "
"available_at = datetime('now', ? || ' seconds'), "
"started_at = NULL, finished_at = NULL, "
"error = COALESCE(?, error) "
"WHERE id = ?",
(f"+{int(round(delay_seconds))}", error, job_id),
)
conn.commit()
conn.close()
def compute_backoff(transient_attempts: int, retry_after: float | None = None) -> float:
"""ORCH-1b: exponential backoff (seconds) for a transient failure.
delay = min(2**transient_attempts * base, max). If the server sent a
Retry-After hint we honour it as a floor (use the larger of the two so we
never poll sooner than the server asked).
`transient_attempts` is the count AFTER this failure (i.e. how many transient
failures have occurred), so the first backoff uses 2**1.
"""
base = getattr(settings, "backoff_base_seconds", 10)
cap = getattr(settings, "backoff_max_seconds", 600)
exp = min((2 ** max(transient_attempts, 0)) * base, cap)
if retry_after is not None and retry_after > 0:
return float(min(max(exp, retry_after), cap))
return float(exp)

87
src/error_classifier.py Normal file
View File

@@ -0,0 +1,87 @@
"""ORCH-1 resilience: classify an agent failure as transient vs permanent.
Rate limits / overload / network blips cannot be reliably predicted in advance,
so we classify *after the run* by scanning the agent's combined stdout/stderr log
(B-2 sends both to /app/data/runs/<run_id>.log).
- transient -> 429 / rate limit / overloaded / network / quota-exhausted etc.
=> backoff + transient retry (separate counter, larger budget).
- permanent -> a genuine code fault / agent error
=> normal attempts < max_attempts, then 'failed'.
Also extracts a Retry-After hint (seconds) when the server provided one.
"""
import re
# Case-insensitive substrings/patterns that signal a transient/rate-limit issue.
_TRANSIENT_PATTERNS = [
r"\b429\b",
r"rate[\s_-]*limit",
r"rate_limit_error",
r"overloaded",
r"overloaded_error",
r"too many requests",
r"quota",
r"insufficient[_\s-]*quota",
r"retry[\s-]*after",
r"service unavailable",
r"\b503\b",
r"\b529\b",
r"timed out",
r"timeout",
r"connection (reset|refused|error|aborted)",
r"temporarily unavailable",
r"econnreset",
r"etimedout",
]
_TRANSIENT_RE = re.compile("|".join(_TRANSIENT_PATTERNS), re.IGNORECASE)
# Retry-After: header style ("Retry-After: 30") or JSON ("retry_after": 30) or
# "retry after 30 seconds". Returns the integer seconds.
_RETRY_AFTER_RE = re.compile(
r"retry[\s_-]*after[\"']?\s*[:=]?\s*[\"']?\s*(\d+)",
re.IGNORECASE,
)
def classify_text(text: str) -> str:
"""Return 'transient' or 'permanent' for a chunk of log/stderr text."""
if not text:
return "permanent"
return "transient" if _TRANSIENT_RE.search(text) else "permanent"
def parse_retry_after(text: str) -> int | None:
"""Return Retry-After seconds if present in the text, else None."""
if not text:
return None
m = _RETRY_AFTER_RE.search(text)
if m:
try:
return int(m.group(1))
except (TypeError, ValueError):
return None
return None
def classify_log_file(path: str, tail_bytes: int = 16384) -> tuple[str, int | None]:
"""Classify the tail of a log file.
Reads the last `tail_bytes` of the log (rate-limit messages appear near the
end) and returns (classification, retry_after_seconds_or_None).
On any read error, treats it as 'permanent' (no special backoff).
"""
if not path:
return "permanent", None
try:
with open(path, "rb") as f:
try:
f.seek(-tail_bytes, 2)
except OSError:
f.seek(0)
data = f.read()
text = data.decode("utf-8", errors="replace")
except Exception:
return "permanent", None
return classify_text(text), parse_retry_after(text)

View File

@@ -51,7 +51,25 @@ async def lifespan(app: FastAPI):
except Exception:
pass
log.warning(f"Recovered {len(orphan_rows)} orphaned agent runs")
yield
# ORCH-1 (F-2b): queue-recovery. Any job left in 'running' status belongs to a
# worker that died on the previous restart -> put it back to 'queued' so the
# worker re-picks it up (restart-safe, no lost work). Runs AFTER M-1.
from .db import requeue_running_jobs
requeued = requeue_running_jobs()
if requeued:
log.warning(f"Queue-recovery: requeued {requeued} running job(s) after restart")
# Start the background job-queue worker (ORCH-1).
from .queue_worker import worker
worker.start()
try:
yield
finally:
# Graceful shutdown of the worker (running agents keep going; their jobs
# are requeued on next start via queue-recovery if the process dies).
worker.stop()
app = FastAPI(title="Multi-Agent Orchestrator", lifespan=lifespan)
@@ -73,3 +91,17 @@ async def status():
).fetchall()
conn.close()
return {"active_tasks": [dict(t) for t in tasks]}
@app.get("/queue")
async def queue():
"""ORCH-1: job-queue observability — status counts + recent jobs."""
from .db import job_status_counts, recent_jobs
from .queue_worker import worker
return {
"counts": job_status_counts(),
"max_concurrency": worker.max_concurrency,
"poll_interval": worker.poll_interval,
"resilience": worker.status(),
"recent": recent_jobs(10),
}

106
src/preflight.py Normal file
View File

@@ -0,0 +1,106 @@
"""ORCH-1 resilience: cheap preflight check (CLI / network available?).
Goal: before the worker claims a job, confirm the claude CLI binary and runtime
are reachable WITHOUT spending any tokens. We only do local/cheap checks:
1. os.path.exists(CLAUDE_BIN) -- instant
2. `claude --version` (timeout 5s) -- spawns CLI, does NOT call the API
The result is cached for `preflight_cache_ttl` seconds so we do not re-run
`claude --version` on every worker tick.
🚫 We deliberately do NOT do a prompt ping (ping->pong) — that would burn the
rate limit and add latency. Preflight is local-only.
"""
import os
import time
import logging
import subprocess
from .config import settings
logger = logging.getLogger("orchestrator.preflight")
_VERSION_TIMEOUT = 5
class _PreflightCache:
def __init__(self):
self.ts: float = 0.0
self.ok: bool = False
self.reason: str = "not checked yet"
_cache = _PreflightCache()
def _claude_bin() -> str:
"""Resolve the claude binary preflight should check.
Must match the binary the launcher actually spawns. The launcher hardcodes
AgentLauncher.CLAUDE_BIN for the real Popen, so we prefer that; we only fall
back to settings.claude_bin / a default if it is somehow unset. (Note: the
container's ORCH_CLAUDE_BIN may point elsewhere; preflight follows the path
that is genuinely executed, not the unused env override.)
"""
try:
from .agents.launcher import AgentLauncher
launcher_bin = getattr(AgentLauncher, "CLAUDE_BIN", None)
if launcher_bin and os.path.exists(launcher_bin):
return launcher_bin
# Launcher path not present -> fall back to configured/default.
return launcher_bin or getattr(settings, "claude_bin", None) or "/opt/claude-code/bin/claude.exe"
except Exception:
return getattr(settings, "claude_bin", None) or "/opt/claude-code/bin/claude.exe"
def _run_version(bin_path: str) -> tuple[bool, str]:
"""`claude --version` — proves the CLI runs without touching the API."""
try:
r = subprocess.run(
[bin_path, "--version"],
capture_output=True,
text=True,
timeout=_VERSION_TIMEOUT,
)
if r.returncode == 0:
return True, (r.stdout or r.stderr or "").strip()[:120] or "ok"
return False, f"--version exit {r.returncode}: {(r.stderr or r.stdout).strip()[:120]}"
except subprocess.TimeoutExpired:
return False, f"--version timed out after {_VERSION_TIMEOUT}s"
except FileNotFoundError:
return False, "claude binary not found (FileNotFoundError)"
except Exception as e: # pragma: no cover - defensive
return False, f"--version error: {e}"
def _compute() -> tuple[bool, str]:
bin_path = _claude_bin()
if not os.path.exists(bin_path):
return False, f"CLAUDE_BIN not found: {bin_path}"
return _run_version(bin_path)
def check(force: bool = False) -> tuple[bool, str]:
"""Return (ok, reason). Cached for preflight_cache_ttl seconds.
force=True bypasses the cache (used by the breaker half-open probe / tests).
"""
now = time.time()
ttl = settings.preflight_cache_ttl
if not force and _cache.ts > 0 and (now - _cache.ts) < ttl:
return _cache.ok, _cache.reason
ok, reason = _compute()
_cache.ts = now
_cache.ok = ok
_cache.reason = reason
if not ok:
logger.warning(f"Preflight FAIL: {reason}")
return ok, reason
def reset_cache() -> None:
"""Invalidate the cache (tests / forced recheck)."""
_cache.ts = 0.0
_cache.ok = False
_cache.reason = "reset"

246
src/queue_worker.py Normal file
View File

@@ -0,0 +1,246 @@
"""ORCH-1 (F-2b): background job-queue worker with resilience layer.
A single background thread polls the `jobs` table and spawns agents:
while running:
if breaker.open and not cooled_down: sleep; continue # don't touch CLI
if not preflight.ok: sleep; continue # CLI/net down -> wait
while count_running_jobs() < max_concurrency:
job = claim_next_job() # atomic queued -> running (available_at-gated)
if not job: break
launcher.launch_job(job) # spawns claude (Popen) + monitor thread
sleep(poll_interval)
Resilience (ДОПОЛНЕНИЕ):
A. Preflight — cheap local CLI/net check (cached, no tokens) gates claiming.
B/C. The launcher classifies failures (transient vs permanent) and applies
backoff via available_at; the worker only needs to honour available_at
(claim_next_job does) and react to transient outcomes via the breaker.
D. Circuit breaker — N consecutive transient failures -> open (pause M minutes,
no CLI calls, Telegram alert) -> half-open (probe one job) -> closed.
Design: plain daemon thread + threading.Event (the launcher already manages its
own monitor/watchdog threads + blocking Popen).
"""
import time
import logging
import threading
from .config import settings
from .db import claim_next_job, count_running_jobs
from .agents.launcher import launcher
from . import preflight
logger = logging.getLogger("orchestrator.queue_worker")
class CircuitBreaker:
"""Trips after `threshold` consecutive transient failures.
States: closed -> (threshold transient) -> open -> (after pause) half-open
-> (recovered) closed | (transient again) open.
Thread-safe enough for our single-worker + monitor-thread callbacks (a lock
guards the counters).
"""
def __init__(self, threshold: int = None, pause_seconds: int = None):
self.threshold = threshold if threshold is not None else settings.breaker_threshold
self.pause_seconds = (
pause_seconds if pause_seconds is not None else settings.breaker_pause_seconds
)
self._lock = threading.Lock()
self.state = "closed" # closed | open | half-open
self.consecutive_transient = 0
self.opened_at = 0.0
self._notify = None # optional callable(message) for alerts
def set_notifier(self, fn):
self._notify = fn
def record_transient(self):
with self._lock:
self.consecutive_transient += 1
if self.state == "half-open":
# Probe failed -> re-open.
self._open("circuit re-opened: probe job hit transient again")
elif self.consecutive_transient >= self.threshold and self.state == "closed":
self._open(
f"circuit OPEN: {self.consecutive_transient} consecutive "
f"transient failures; pausing {self.pause_seconds}s (no CLI calls)"
)
def record_recovered(self):
with self._lock:
self.consecutive_transient = 0
if self.state in ("half-open", "open"):
self.state = "closed"
logger.info("Circuit CLOSED: recovered")
def record_permanent(self):
# A clean permanent (code-fault) failure breaks the transient streak.
with self._lock:
self.consecutive_transient = 0
def _open(self, msg: str):
self.state = "open"
self.opened_at = time.time()
logger.warning(msg)
if self._notify:
try:
self._notify(f"\U0001f534 {msg}")
except Exception:
pass
def allow_claim(self) -> bool:
"""Return True if the worker may attempt to claim/launch a job now.
- closed -> yes.
- open -> no until pause elapsed; then transition to half-open (yes, one probe).
- half-open -> yes (the single probe).
"""
with self._lock:
if self.state == "closed":
return True
if self.state == "open":
if (time.time() - self.opened_at) >= self.pause_seconds:
self.state = "half-open"
logger.info("Circuit HALF-OPEN: probing one job")
return True
return False
# half-open: allow the probe.
return True
def snapshot(self) -> dict:
with self._lock:
remaining = 0
if self.state == "open":
remaining = max(0, int(self.pause_seconds - (time.time() - self.opened_at)))
return {
"state": self.state,
"consecutive_transient": self.consecutive_transient,
"pause_remaining_s": remaining,
}
class QueueWorker:
"""Background worker that drains the persistent job queue (with resilience)."""
def __init__(self, max_concurrency: int = None, poll_interval: float = None,
breaker: CircuitBreaker = None):
self.max_concurrency = (
max_concurrency if max_concurrency is not None else settings.max_concurrency
)
self.poll_interval = (
poll_interval if poll_interval is not None else settings.queue_poll_interval
)
self.breaker = breaker or CircuitBreaker()
self.last_preflight_ok = True
self.last_preflight_reason = "not checked"
self._stop = threading.Event()
self._thread: threading.Thread | None = None
# --- circuit breaker outcome callback wired into the launcher ----------
def _on_outcome(self, transient: bool, recovered: bool):
if recovered:
self.breaker.record_recovered()
elif transient:
self.breaker.record_transient()
else:
self.breaker.record_permanent()
def _drain_once(self):
"""Claim and launch jobs until concurrency is full or the queue is empty.
Gated by the circuit breaker and preflight: if the breaker is open (and
not yet cooled down) or preflight fails, we do NOT claim — jobs stay
queued and no CLI/tokens are touched.
"""
if not self.breaker.allow_claim():
return
ok, reason = preflight.check()
self.last_preflight_ok = ok
self.last_preflight_reason = reason
if not ok:
logger.info(f"Preflight not ok ({reason}) -> not claiming jobs this tick")
return
# In half-open we only probe a single job, regardless of max_concurrency.
half_open = self.breaker.snapshot()["state"] == "half-open"
launched = 0
while not self._stop.is_set():
if half_open and launched >= 1:
return
if count_running_jobs() >= self.max_concurrency:
return
job = claim_next_job()
if not job:
return
launched += 1
try:
run_id = launcher.launch_job(job)
logger.info(
f"Worker launched job {job['id']} ({job['agent']}, "
f"repo {job['repo']}) -> run_id={run_id}"
)
except Exception as e:
# Launch itself failed (e.g. repo missing): treat as a permanent
# launch error so the job does not wedge as 'running' forever.
logger.error(f"Worker failed to launch job {job['id']}: {e}")
try:
from .db import get_job, mark_job
j = get_job(job["id"])
attempts = j.get("attempts", 0) if j else 0
max_attempts = j.get("max_attempts", 2) if j else 2
if attempts < max_attempts:
mark_job(job["id"], "queued", error=f"launch error: {e}")
else:
mark_job(job["id"], "failed", error=f"launch error: {e}")
except Exception:
pass
def _run(self):
logger.info(
f"Queue worker started (max_concurrency={self.max_concurrency}, "
f"poll_interval={self.poll_interval}s, breaker_threshold={self.breaker.threshold})"
)
while not self._stop.is_set():
try:
self._drain_once()
except Exception as e:
logger.error(f"Queue worker loop error: {e}")
self._stop.wait(self.poll_interval)
logger.info("Queue worker stopped")
def start(self):
if self._thread and self._thread.is_alive():
return
# Wire breaker alerting + launcher outcome callback.
try:
from .notifications import send_telegram
self.breaker.set_notifier(send_telegram)
except Exception:
pass
launcher.on_outcome = self._on_outcome
self._stop.clear()
self._thread = threading.Thread(
target=self._run, name="queue-worker", daemon=True
)
self._thread.start()
def stop(self, timeout: float = 5.0):
self._stop.set()
if self._thread:
self._thread.join(timeout=timeout)
def status(self) -> dict:
"""Resilience snapshot for /queue."""
return {
"breaker": self.breaker.snapshot(),
"preflight_ok": self.last_preflight_ok,
"preflight_reason": self.last_preflight_reason,
}
# Module-level singleton used by the FastAPI lifespan.
worker = QueueWorker()

425
src/stage_engine.py Normal file
View File

@@ -0,0 +1,425 @@
"""Unified stage engine (ORCH-4 / M-3).
Single source of truth for "an agent finished / a human approved -> run the
stage's quality gate and either advance the pipeline or roll it back".
Before ORCH-4 this logic was duplicated in two places that had silently
diverged:
- src/agents/launcher.py::_try_advance_stage (sync, rich business logic:
analyst approved-flow, reviewer REQUEST_CHANGES rollback+retry, tester FAIL
rollback+retry, architect conflict rollback) — but it picked the next agent
with get_agent_for_stage(next_stage), which is WRONG.
- src/webhooks/plane.py::_try_advance_stage (async, leaner, but it had the
check_review_approved PR-by-branch dispatch and used the CORRECT
get_agent_for_stage(current_stage)).
This module merges both into one sync `advance_stage(...)`. launcher calls it
directly; the plane webhook calls it through asyncio.to_thread so there is
exactly one implementation.
Agent-selection bug fix (ORCH-4):
stages.py defines `agent` as "the agent to launch when advancing FROM this
stage". So when advancing current -> next, the correct agent to launch is
get_agent_for_stage(current_stage). launcher's old next_stage lookup skipped a
stage (e.g. analysis->architecture launched 'developer' instead of
'architect'). plane and gitea already used current_stage; we unify on that.
"""
import logging
import os
from dataclasses import dataclass, field
from .db import get_db, update_task_stage, enqueue_job
from .stages import get_next_stage, get_qg_for_stage, get_agent_for_stage
from .git_worktree import get_worktree_path
from .qg.checks import QG_CHECKS
from .notifications import (
notify_stage_change,
notify_qg_failure,
notify_approve_requested,
send_telegram,
)
from .plane_sync import (
notify_stage_change as plane_notify_stage,
notify_qg_failure as plane_notify_qg,
add_comment as plane_add_comment,
set_issue_in_review,
set_issue_needs_input,
set_issue_in_progress,
set_issue_blocked,
)
from .config import settings
logger = logging.getLogger("orchestrator.stage_engine")
MAX_DEVELOPER_RETRIES = 3
@dataclass
class AdvanceResult:
"""Outcome of an advance_stage() call (mostly for tests/observability)."""
advanced: bool = False
from_stage: str | None = None
to_stage: str | None = None
enqueued_agent: str | None = None
enqueued_job_id: int | None = None
qg_name: str | None = None
qg_passed: bool | None = None
qg_reason: str | None = None
rolled_back_to: str | None = None
alerted: bool = False
note: str | None = None
notes: list = field(default_factory=list)
def _run_qg(qg_name: str, repo: str, work_item_id: str, branch: str):
"""Dispatch a quality-gate check to the right signature and run it.
Signatures (unified from launcher + plane):
- check_ci_green / check_tests_local -> (repo, branch)
- check_review_approved -> (repo, pr_number) [PR found by branch]
- everything else (artifact checks) -> (repo, work_item_id, branch)
Returns (passed: bool, reason: str).
"""
check_fn = QG_CHECKS.get(qg_name)
if not check_fn:
logger.error(f"QG function '{qg_name}' not found in registry")
return False, f"Unknown QG: {qg_name}"
if qg_name in ("check_ci_green", "check_tests_local"):
# (repo, branch) — already worktree-aware.
return check_fn(repo, branch)
if qg_name == "check_review_approved":
# Special case kept from plane: find the open PR for this branch via
# Gitea, then check it; fall back to a file-based review marker.
return _check_review_approved_by_branch(check_fn, repo, work_item_id, branch)
# All other artifact checks: (repo, work_item_id, branch). Pass branch so the
# check reads from the task worktree (ORCH-2 / S-4).
return check_fn(repo, work_item_id or "", branch)
def _check_review_approved_by_branch(check_fn, repo: str, work_item_id: str, branch: str):
"""check_review_approved dispatch preserved from plane._try_advance_stage.
Finds the open PR whose head ref == branch via the Gitea API and runs
check_review_approved(repo, pr_number). If no open PR exists, falls back to a
file-based review marker (12-review.md / 09-review.md) like the original.
"""
import httpx as _httpx
owner = settings.gitea_owner
url = f"{settings.gitea_url}/api/v1/repos/{owner}/{repo}/pulls?state=open&limit=50"
headers = {"Authorization": f"token {settings.gitea_token}"}
try:
resp = _httpx.get(url, headers=headers, timeout=10)
prs = resp.json()
pr_number = None
for pr in prs:
if pr.get("head", {}).get("ref") == branch:
pr_number = pr["number"]
break
if pr_number:
return check_fn(repo, pr_number)
# No open PR but a review file may exist — check file-based.
wt = get_worktree_path(repo, branch)
if not os.path.isdir(wt):
wt = os.path.join(settings.repos_dir, repo)
review_path = os.path.join(wt, f"docs/work-items/{work_item_id}/12-review.md")
review_path2 = os.path.join(wt, f"docs/work-items/{work_item_id}/09-review.md")
if os.path.isfile(review_path) or os.path.isfile(review_path2):
return True, "Review file exists (file-based approval)"
return False, "No open PR found and no review file"
except Exception as e:
return False, f"Error finding PR: {e}"
def _developer_retry_count(task_id: int) -> int:
"""How many developer runs have already happened for this task."""
conn = get_db()
n = conn.execute(
"SELECT COUNT(*) FROM agent_runs WHERE task_id=? AND agent='developer'",
(task_id,),
).fetchone()[0]
conn.close()
return n
def advance_stage(
task_id: int,
current_stage: str,
repo: str,
work_item_id: str,
branch: str,
finished_agent: str | None = None,
) -> AdvanceResult:
"""Run the current stage's quality gate and advance / roll back the pipeline.
This is the single merged implementation (ORCH-4 / M-3). It is synchronous;
the async plane webhook calls it via asyncio.to_thread.
Args:
task_id: tasks.id
current_stage: the stage the task is currently in
repo: repository name
work_item_id: Plane work item id (may be "" / None)
branch: feature branch
finished_agent: the agent that just finished (launcher path). Drives the
approved/REQUEST_CHANGES/tester/architect branches. In the
plane webhook path it is None, so those agent-specific
branches simply do not trigger (matches old plane behavior).
Returns AdvanceResult describing what happened.
"""
result = AdvanceResult(from_stage=current_stage)
agent = finished_agent
try:
qg_name = get_qg_for_stage(current_stage)
next_stage = get_next_stage(current_stage)
result.qg_name = qg_name
result.to_stage = next_stage
if not next_stage:
logger.info(f"Task {task_id}: already at terminal stage '{current_stage}'")
result.note = "terminal"
return result
# --- Quality gate ----------------------------------------------------
if qg_name and qg_name in QG_CHECKS:
# Human-approval gate: special analyst approved-flow (launcher only).
if qg_name == "check_analysis_approved":
_handle_analysis_approved_flow(
task_id, current_stage, repo, work_item_id, branch, agent, result
)
return result
passed, reason = _run_qg(qg_name, repo, work_item_id, branch)
result.qg_passed = passed
result.qg_reason = reason
if not passed:
logger.info(
f"Task {task_id}: QG '{qg_name}' not passed after {agent}: {reason}"
)
# Behaviour parity:
# - webhook path (finished_agent is None): emit the generic
# QG-failure notification, exactly like the old plane handler.
# - launcher path (finished_agent set): NO generic notification;
# the rollback branches below own their own messaging, exactly
# like the old launcher handler.
if agent is None:
notify_qg_failure(task_id, current_stage, qg_name, reason)
plane_notify_qg(work_item_id, current_stage, qg_name, reason)
_handle_qg_failure_rollbacks(
task_id, current_stage, repo, work_item_id, branch,
agent, qg_name, reason, result,
)
return result
elif qg_name:
# QG name set but not registered — do not advance (launcher behavior).
result.note = f"qg '{qg_name}' not in registry"
return result
# --- Advance ---------------------------------------------------------
update_task_stage(task_id, next_stage)
notify_stage_change(task_id, current_stage, next_stage)
plane_notify_stage(work_item_id, current_stage, next_stage)
result.advanced = True
logger.info(
f"Task {task_id}: {current_stage} -> {next_stage} "
f"(auto-advance after {agent})"
)
# --- Launch the next agent (ORCH-4 fix: current_stage, not next) -----
next_agent = get_agent_for_stage(current_stage)
if next_agent:
task_desc = (
f"Work item: {work_item_id}\nRepo: {repo}\n"
f"Branch: {branch}\nStage: {next_stage}"
)
new_job_id = enqueue_job(next_agent, repo, task_desc, task_id=task_id)
result.enqueued_agent = next_agent
result.enqueued_job_id = new_job_id
logger.info(
f"Task {task_id}: enqueued '{next_agent}' (job_id={new_job_id})"
)
return result
except Exception as e:
logger.error(f"advance_stage failed for task_id={task_id}: {e}")
result.note = f"error: {e}"
return result
def _handle_analysis_approved_flow(
task_id, current_stage, repo, work_item_id, branch, agent, result: AdvanceResult
):
"""Analyst approved-flow (launcher only).
Only triggers when the analyst just finished (agent == 'analyst') in the
launcher path. Decides between: artifacts ready -> In Review + request
:approved:; questions file -> Needs Input; otherwise a warning comment.
This gate never advances on its own (human approval does that via the plane
webhook), matching the original launcher behavior.
"""
result.qg_name = "check_analysis_approved"
result.note = "analysis-approval-gate"
if not (agent == "analyst" and work_item_id):
return
files_check = QG_CHECKS.get("check_analysis_complete")
if not files_check:
return
files_ok, _ = files_check(repo, work_item_id, branch)
if files_ok:
# Full artifacts ready -> In Review, ask for :approved:.
set_issue_in_review(work_item_id)
plane_add_comment(
work_item_id,
"\U0001f4cb BRD/\u0422\u0417/AC/TestPlan \u0433\u043e\u0442\u043e\u0432\u044b. "
"\u041f\u0440\u043e\u0448\u0443 review \u0438 \u0440\u0435\u0430\u043a\u0446\u0438\u044e :approved: "
"\u0434\u043b\u044f \u043f\u0440\u043e\u0434\u0432\u0438\u0436\u0435\u043d\u0438\u044f \u0432 Architecture.",
)
notify_approve_requested(task_id)
result.note = "analysis-in-review"
logger.info(
f"Task {task_id}: analyst finished, requested :approved: in Plane"
)
return
questions_path = os.path.join(
get_worktree_path(repo, branch),
f"docs/work-items/{work_item_id}/01-questions.md",
)
if os.path.isfile(questions_path):
set_issue_needs_input(work_item_id)
with open(questions_path, "r") as qf:
questions_text = qf.read()
plane_add_comment(
work_item_id,
f"\u2753 Analyst \u043d\u0443\u0436\u0434\u0430\u0435\u0442\u0441\u044f \u0432 \u0443\u0442\u043e\u0447\u043d\u0435\u043d\u0438\u0438:\n\n{questions_text}",
)
send_telegram(
f"\u2753 {work_item_id}: Analyst \u0437\u0430\u0434\u0430\u0451\u0442 \u0432\u043e\u043f\u0440\u043e\u0441\u044b. \u041e\u0442\u0432\u0435\u0442\u044c \u0432 Plane."
)
result.note = "analysis-needs-input"
return
# No artifacts and no questions.
plane_add_comment(
work_item_id,
"\u26a0\ufe0f Analyst \u0437\u0430\u0432\u0435\u0440\u0448\u0438\u043b\u0441\u044f \u0431\u0435\u0437 \u0430\u0440\u0442\u0435\u0444\u0430\u043a\u0442\u043e\u0432 \u0438 \u0431\u0435\u0437 \u0432\u043e\u043f\u0440\u043e\u0441\u043e\u0432. \u041f\u0440\u043e\u0432\u0435\u0440\u044c\u0442\u0435 \u043b\u043e\u0433.",
)
result.note = "analysis-empty"
def _handle_qg_failure_rollbacks(
task_id, current_stage, repo, work_item_id, branch,
agent, qg_name, reason, result: AdvanceResult,
):
"""All rollback/retry branches from the original launcher, preserved verbatim.
Only fire on the launcher path (finished_agent is set). The webhook path
passes finished_agent=None, so none of these agent-specific branches trigger
— that matches the old plane behavior (it just reported the QG failure).
"""
# Reviewer REQUEST_CHANGES -> rollback to development + retry (max 3).
if agent == "reviewer" and "REQUEST_CHANGES" in (reason or ""):
update_task_stage(task_id, "development")
notify_stage_change(task_id, current_stage, "development")
plane_notify_stage(work_item_id, current_stage, "development")
result.rolled_back_to = "development"
retry_count = _developer_retry_count(task_id)
if retry_count < MAX_DEVELOPER_RETRIES:
task_desc = (
f"Work item: {work_item_id}\nRepo: {repo}\nBranch: {branch}\n"
f"Stage: development\nNote: REQUEST_CHANGES from reviewer "
f"(attempt {retry_count+1}/3). Fix findings in "
f"docs/work-items/{work_item_id}/12-review.md"
)
new_job = enqueue_job("developer", repo, task_desc, task_id=task_id)
result.enqueued_agent = "developer"
result.enqueued_job_id = new_job
logger.info(
f"Task {task_id}: reviewer REQUEST_CHANGES, enqueued developer "
f"(job_id={new_job})"
)
else:
send_telegram(
f"\u26a0\ufe0f {work_item_id}: Max developer retries (3) reached. "
f"Manual intervention needed."
)
result.alerted = True
logger.error(f"Task {task_id}: max retries reached")
# Tester check_tests_passed FAIL -> rollback to development + retry (max 3).
if agent == "tester" and qg_name == "check_tests_passed":
update_task_stage(task_id, "development")
notify_stage_change(task_id, current_stage, "development")
plane_notify_stage(work_item_id, current_stage, "development")
result.rolled_back_to = "development"
set_issue_in_progress(work_item_id)
plane_add_comment(
work_item_id,
f"\u274c \u0422\u0435\u0441\u0442\u044b \u043d\u0435 \u043f\u0440\u043e\u0448\u043b\u0438: {reason}. "
f"Developer \u043f\u0435\u0440\u0435\u0437\u0430\u043f\u0443\u0449\u0435\u043d \u0434\u043b\u044f \u0444\u0438\u043a\u0441\u0430.",
)
retry_count = _developer_retry_count(task_id)
if retry_count < MAX_DEVELOPER_RETRIES:
task_desc = (
f"Work item: {work_item_id}\nRepo: {repo}\nBranch: {branch}\n"
f"Stage: development\nNote: Tests FAILED. "
f"Fix failures described in docs/work-items/{work_item_id}/13-test-report.md"
)
new_job = enqueue_job("developer", repo, task_desc, task_id=task_id)
result.enqueued_agent = "developer"
result.enqueued_job_id = new_job
logger.info(
f"Task {task_id}: tester FAIL, enqueued developer (job_id={new_job})"
)
else:
set_issue_blocked(work_item_id)
send_telegram(
f"\U0001f6a8 {work_item_id}: Tests still failing after 3 developer "
f"retries. Manual intervention needed."
)
result.alerted = True
# Architect conflict (10-conflict.md exists) -> rollback to analysis.
if agent == "architect" and qg_name == "check_architecture_done":
conflict_path = os.path.join(
get_worktree_path(repo, branch),
f"docs/work-items/{work_item_id}/10-conflict.md",
)
if os.path.isfile(conflict_path):
update_task_stage(task_id, "analysis")
notify_stage_change(task_id, current_stage, "analysis")
plane_notify_stage(work_item_id, current_stage, "analysis")
result.rolled_back_to = "analysis"
set_issue_in_progress(work_item_id)
with open(conflict_path, "r") as cf:
conflict_text = cf.read()[:500]
plane_add_comment(
work_item_id,
f"\u26a0\ufe0f Architect \u043d\u0430\u0448\u0451\u043b \u043a\u043e\u043d\u0444\u043b\u0438\u043a\u0442 \u0441 \u0422\u0417. "
f"\u0412\u043e\u0437\u0432\u0440\u0430\u0442 \u0432 Analysis.\n\n{conflict_text}",
)
task_desc = (
f"Work item: {work_item_id}\nRepo: {repo}\nBranch: {branch}\n"
f"Stage: analysis\nNote: Architect conflict. Revise TRZ. "
f"See docs/work-items/{work_item_id}/10-conflict.md"
)
new_job = enqueue_job("analyst", repo, task_desc, task_id=task_id)
result.enqueued_agent = "analyst"
result.enqueued_job_id = new_job
logger.info(
f"Task {task_id}: architect conflict, enqueued analyst "
f"(job_id={new_job})"
)

View File

@@ -10,7 +10,7 @@ import httpx
from fastapi import APIRouter, Request, HTTPException
from ..config import settings
from ..db import get_db, get_task_by_repo_branch, update_task_stage
from ..db import get_db, get_task_by_repo_branch, update_task_stage, enqueue_job
from ..stages import get_next_stage, get_agent_for_stage
from ..qg.checks import check_ci_green, check_review_approved
from ..notifications import notify_stage_change, notify_qg_failure, notify_error
@@ -123,8 +123,8 @@ async def handle_push(payload: dict):
if agent:
try:
task_desc = f"Work item: {work_item_id}\nRepo: {repo_name}\nBranch: {branch}\nStage: {next_stage}"
run_id = launcher.launch(agent, repo_name, task_desc, task_id=task_id)
logger.info(f"Task {task_id}: push triggered {current_stage}{next_stage}, launched '{agent}' (run_id={run_id})")
job_id = enqueue_job(agent, repo_name, task_desc, task_id=task_id)
logger.info(f"Task {task_id}: push triggered {current_stage}{next_stage}, enqueued '{agent}' (job_id={job_id})")
except Exception as e:
notify_error(task_id, f"Failed to launch agent '{agent}': {e}")
@@ -200,8 +200,8 @@ async def handle_ci_status(payload: dict):
if agent:
try:
task_desc = f"Work item: {work_item_id}\nRepo: {repo_name}\nBranch: {branch}\nStage: {next_stage}"
run_id = launcher.launch(agent, repo_name, task_desc, task_id=task_id)
logger.info(f"Task {task_id}: CI green → {next_stage}, launched '{agent}' (run_id={run_id})")
job_id = enqueue_job(agent, repo_name, task_desc, task_id=task_id)
logger.info(f"Task {task_id}: CI green → {next_stage}, enqueued '{agent}' (job_id={job_id})")
except Exception as e:
notify_error(task_id, f"Failed to launch agent '{agent}': {e}")
else:
@@ -272,8 +272,8 @@ async def handle_pr(payload: dict):
if agent:
try:
task_desc = f"Work item: {work_item_id}\nRepo: {repo_name}\nBranch: {head_branch}\nStage: {next_stage}"
run_id = launcher.launch(agent, repo_name, task_desc, task_id=task_id)
logger.info(f"Task {task_id}: PR approved → {next_stage}, launched '{agent}' (run_id={run_id})")
job_id = enqueue_job(agent, repo_name, task_desc, task_id=task_id)
logger.info(f"Task {task_id}: PR approved → {next_stage}, enqueued '{agent}' (job_id={job_id})")
except Exception as e:
notify_error(task_id, f"Failed to launch agent '{agent}': {e}")
else:
@@ -297,8 +297,8 @@ async def handle_pr(payload: dict):
f"Work item: {work_item_id}\nRepo: {repo_name}\nBranch: {head_branch}\n"
f"Stage: development\nNote: Changes requested in review (attempt {retry_count + 1}/{MAX_DEV_RETRIES})"
)
run_id = launcher.launch("developer", repo_name, task_desc, task_id=task_id)
logger.info(f"Task {task_id}: changes requested, relaunching developer (attempt {retry_count + 1})")
job_id = enqueue_job("developer", repo_name, task_desc, task_id=task_id)
logger.info(f"Task {task_id}: changes requested, enqueued developer (attempt {retry_count + 1}, job_id={job_id})")
except Exception as e:
notify_error(task_id, f"Failed to relaunch developer: {e}")
else:

View File

@@ -14,6 +14,7 @@ from ..db import (
get_task_by_plane_id,
get_next_work_item_id,
update_task_stage,
enqueue_job,
)
from ..stages import get_next_stage, get_agent_for_stage, get_qg_for_stage, get_previous_stage
from ..qg.checks import QG_CHECKS
@@ -186,8 +187,8 @@ async def handle_work_item_created(data: dict, project_id: str = ""):
if task_row:
task_id = task_row[0]
task_desc = f"Work item: {work_item_id}\nRepo: {repo}\nBranch: {branch}\nStage: analysis\nTitle: {name}"
run_id = launcher.launch("analyst", repo, task_desc, task_id=task_id)
logger.info(f"Task {task_id}: launched analyst (run_id={run_id})")
job_id = enqueue_job("analyst", repo, task_desc, task_id=task_id)
logger.info(f"Task {task_id}: enqueued analyst (job_id={job_id})")
# Post start comment to Plane
from ..plane_sync import add_comment as _add_comment
_add_comment(work_item_id, "\U0001f50d Analyst \u0437\u0430\u043f\u0443\u0449\u0435\u043d. BRD/\u0422\u0417/AC/TestPlan \u0432 \u0440\u0430\u0431\u043e\u0442\u0435 (\u043e\u0436\u0438\u0434\u0430\u0439\u0442\u0435 8-15 \u043c\u0438\u043d).")
@@ -231,10 +232,10 @@ async def handle_comment(data: dict, project_id: str = ""):
f"Stage: analysis\nNote: Stakeholder REJECTED your artifacts. "
f"Reason: {reason}\nRevise and improve."
)
new_run = launcher.launch("analyst", repo, task_desc, task_id=task_id)
new_job = enqueue_job("analyst", repo, task_desc, task_id=task_id)
from ..plane_sync import add_comment as _plane_comment
_plane_comment(work_item_id, f"\U0001f504 Analyst \u043f\u0435\u0440\u0435\u0437\u0430\u043f\u0443\u0449\u0435\u043d. \u041f\u0440\u0438\u0447\u0438\u043d\u0430 \u043e\u0442\u043a\u043b\u043e\u043d\u0435\u043d\u0438\u044f: {reason}")
logger.info(f"Task {task_id}: rejected at analysis, relaunched analyst")
logger.info(f"Task {task_id}: rejected at analysis, enqueued analyst (job_id={new_job})")
else:
# Rollback to previous stage
prev_stage = get_previous_stage(current_stage)
@@ -305,10 +306,10 @@ async def handle_comment(data: dict, project_id: str = ""):
f"Read the latest comment in Plane and revise your artifacts.\n"
f"Answer: {comment_body[:500]}"
)
new_run = launcher.launch("analyst", repo, task_desc, task_id=task_id)
new_job = enqueue_job("analyst", repo, task_desc, task_id=task_id)
from ..plane_sync import add_comment as _pc2
_pc2(work_item_id, "\U0001f504 Analyst \u043f\u0435\u0440\u0435\u0437\u0430\u043f\u0443\u0449\u0435\u043d \u0441 \u043e\u0442\u0432\u0435\u0442\u0430\u043c\u0438 \u0441\u0442\u0435\u0439\u043a\u0445\u043e\u043b\u0434\u0435\u0440\u0430.")
logger.info(f"Task {task_id}: stakeholder answered questions, relaunched analyst (run_id={new_run})")
logger.info(f"Task {task_id}: stakeholder answered questions, enqueued analyst (job_id={new_job})")
return
except Exception as e:
logger.error(f"Failed to check issue state: {e}")
@@ -317,81 +318,30 @@ async def handle_comment(data: dict, project_id: str = ""):
async def _try_advance_stage(
task_id: int, current_stage: str, repo: str, work_item_id: str, branch: str
):
"""Run QG check for current stage and advance if passed."""
qg_name = get_qg_for_stage(current_stage)
next_stage = get_next_stage(current_stage)
"""Thin async wrapper over the unified stage engine (ORCH-4 / M-3).
if not next_stage:
logger.info(f"Task {task_id}: already at terminal stage '{current_stage}'")
return
The QG dispatch (including the check_review_approved PR-by-branch logic) and
the advance/launch logic now live in src/stage_engine.advance_stage(), which
is synchronous. We run it off the event loop via asyncio.to_thread so there
is exactly one implementation shared with the launcher.
# Run QG check if one is required
if qg_name:
qg_func = QG_CHECKS.get(qg_name)
if not qg_func:
logger.error(f"QG function '{qg_name}' not found in registry")
return
finished_agent is None on this webhook path (a human :approved: comment, not
a finished agent), so the agent-specific rollback branches inside the engine
intentionally do not trigger — identical to the old plane behavior, which
only ran the QG and either advanced or reported the failure.
"""
import asyncio
from ..stage_engine import advance_stage
# Determine args based on QG function
if qg_name in ("check_analysis_approved", "check_analysis_complete", "check_architecture_done", "check_tests_passed", "check_reviewer_verdict"):
# ORCH-2 / S-4: pass branch so artifacts are read from the task worktree.
passed, reason = qg_func(repo, work_item_id, branch)
elif qg_name in ("check_ci_green", "check_tests_local"):
passed, reason = qg_func(repo, branch)
elif qg_name == "check_review_approved":
# Find PR number by branch via Gitea API
import httpx as _httpx
from ..config import settings as _s
_owner = _s.gitea_owner
_url = f"{_s.gitea_url}/api/v1/repos/{_owner}/{repo}/pulls?state=open&limit=50"
_headers = {"Authorization": f"token {_s.gitea_token}"}
try:
_resp = _httpx.get(_url, headers=_headers, timeout=10)
_prs = _resp.json()
_pr_number = None
for _pr in _prs:
if _pr.get("head", {}).get("ref") == branch:
_pr_number = _pr["number"]
break
if _pr_number:
passed, reason = qg_func(repo, _pr_number)
else:
# No open PR but review file exists — check file-based
import os
from ..git_worktree import get_worktree_path as _gwp
_wt = _gwp(repo, branch) if os.path.isdir(_gwp(repo, branch)) else os.path.join(_s.repos_dir, repo)
_review_path = os.path.join(_wt, f"docs/work-items/{work_item_id}/12-review.md")
_review_path2 = os.path.join(_wt, f"docs/work-items/{work_item_id}/09-review.md")
if os.path.isfile(_review_path) or os.path.isfile(_review_path2):
passed, reason = True, "Review file exists (file-based approval)"
else:
passed, reason = False, "No open PR found and no review file"
except Exception as _e:
passed, reason = False, f"Error finding PR: {_e}"
else:
passed, reason = False, f"Unknown QG: {qg_name}"
if not passed:
notify_qg_failure(task_id, current_stage, qg_name, reason)
plane_notify_qg(work_item_id, current_stage, qg_name, reason)
return
# Advance stage
update_task_stage(task_id, next_stage)
notify_stage_change(task_id, current_stage, next_stage)
plane_notify_stage(work_item_id, current_stage, next_stage)
# Launch agent associated with the current stage's transition
agent = get_agent_for_stage(current_stage)
if agent:
try:
task_desc = f"Work item: {work_item_id}\nRepo: {repo}\nBranch: {branch}\nStage: {next_stage}"
run_id = launcher.launch(agent, repo, task_desc, task_id=task_id)
plane_notify_stage(work_item_id, current_stage, next_stage, agent)
logger.info(f"Task {task_id}: launched agent '{agent}', run_id={run_id}")
except Exception as e:
notify_error(task_id, f"Failed to launch agent '{agent}': {e}")
logger.error(f"Agent launch failed: {e}")
await asyncio.to_thread(
advance_stage,
task_id,
current_stage,
repo,
work_item_id,
branch,
None,
)
async def _create_gitea_branch(repo: str, branch: str):

View File

@@ -7,6 +7,7 @@ Covers the audit-2026-06-02 fixes:
the YAML frontmatter only (no fragile substring matching).
"""
import os
import signal
import tempfile
import pytest
@@ -20,6 +21,7 @@ os.environ["ORCH_PLANE_API_TOKEN"] = "test-token"
from src.agents.launcher import AgentLauncher
from src.qg.checks import check_reviewer_verdict
from src.config import settings
# ---------------------------------------------------------------------------
@@ -138,3 +140,141 @@ class TestCheckReviewerVerdict:
passed, reason = check_reviewer_verdict("enduro-trails", "ET-999")
assert passed is False
assert "not found" in reason.lower()
# ---------------------------------------------------------------------------
# ORCH-7 (M-4): dead code removed
# ---------------------------------------------------------------------------
class TestDeadCodeRemoved:
"""M-4: _auto_merge_pr was never called (merge is the deployer's job) and is
removed. _ensure_pr (used by the auto-advance path) must stay."""
def test_auto_merge_pr_is_gone(self):
assert not hasattr(AgentLauncher, "_auto_merge_pr")
def test_ensure_pr_still_present(self):
assert hasattr(AgentLauncher, "_ensure_pr")
# ---------------------------------------------------------------------------
# ORCH-7 (M-2): configurable timeout + per-agent override
# ---------------------------------------------------------------------------
class TestResolveTimeout:
"""M-2: _resolve_timeout honours a per-agent JSON override, else the default."""
def test_default_when_no_override(self, monkeypatch):
monkeypatch.setattr(settings, "agent_timeout_seconds", 1800)
monkeypatch.setattr(settings, "agent_timeout_overrides_json", "")
assert AgentLauncher._resolve_timeout("developer") == 1800
assert AgentLauncher._resolve_timeout(None) == 1800
def test_override_for_specific_agent(self, monkeypatch):
monkeypatch.setattr(settings, "agent_timeout_seconds", 1800)
monkeypatch.setattr(
settings, "agent_timeout_overrides_json", '{"reviewer": 3600, "architect": 2700}'
)
assert AgentLauncher._resolve_timeout("reviewer") == 3600
assert AgentLauncher._resolve_timeout("architect") == 2700
# an agent not in the override map falls back to the default
assert AgentLauncher._resolve_timeout("developer") == 1800
def test_malformed_override_falls_back_to_default(self, monkeypatch):
monkeypatch.setattr(settings, "agent_timeout_seconds", 1800)
monkeypatch.setattr(settings, "agent_timeout_overrides_json", "{not-json")
# must not raise, must return the default
assert AgentLauncher._resolve_timeout("reviewer") == 1800
class TestWatchdogGracefulKill:
"""M-2: SIGTERM -> grace -> SIGKILL ordering, with graceful-exit short-circuit
and ProcessLookupError tolerance. The OS process is fully faked: we record the
signals sent and decide liveness from a script, so no real process is touched."""
def _patch_db(self, monkeypatch):
"""Stub get_db so _record_kill does not need a real DB."""
class _Conn:
def execute(self, *a, **k):
return self
def commit(self):
pass
def close(self):
pass
monkeypatch.setattr("src.agents.launcher.get_db", lambda: _Conn())
def test_sigterm_then_sigkill_after_grace(self, monkeypatch):
"""Process stays alive through the whole grace window -> SIGTERM then SIGKILL."""
self._patch_db(monkeypatch)
monkeypatch.setattr(settings, "agent_kill_grace_seconds", 1)
monkeypatch.setattr("src.agents.launcher.time.sleep", lambda s: None)
sent = []
def fake_kill(pid, sig):
sent.append(sig)
# signal 0 (liveness probe) -> always alive; never raise
return None
monkeypatch.setattr("src.agents.launcher.os.kill", fake_kill)
launcher = AgentLauncher()
launcher._watchdog(pid=4242, run_id=1, timeout=0, agent="developer")
assert signal.SIGTERM in sent
assert signal.SIGKILL in sent
# SIGTERM must come before SIGKILL
assert sent.index(signal.SIGTERM) < sent.index(signal.SIGKILL)
def test_graceful_exit_in_grace_skips_sigkill(self, monkeypatch):
"""Process dies during the grace window -> SIGKILL is NOT sent."""
self._patch_db(monkeypatch)
monkeypatch.setattr(settings, "agent_kill_grace_seconds", 5)
monkeypatch.setattr("src.agents.launcher.time.sleep", lambda s: None)
sent = []
state = {"alive": True, "probes": 0}
def fake_kill(pid, sig):
if sig == 0:
state["probes"] += 1
# die on the 2nd liveness probe (within grace)
if state["probes"] >= 2:
raise ProcessLookupError
return None
sent.append(sig)
return None
monkeypatch.setattr("src.agents.launcher.os.kill", fake_kill)
launcher = AgentLauncher()
launcher._watchdog(pid=4242, run_id=2, timeout=0, agent="developer")
assert signal.SIGTERM in sent
assert signal.SIGKILL not in sent
def test_already_dead_before_sigterm(self, monkeypatch):
"""Process already gone at SIGTERM -> ProcessLookupError tolerated, no SIGKILL,
and _record_kill is NOT called (the monitor's proc.wait owns the exit)."""
self._patch_db(monkeypatch)
monkeypatch.setattr("src.agents.launcher.time.sleep", lambda s: None)
sent = []
def fake_kill(pid, sig):
if sig == signal.SIGTERM:
raise ProcessLookupError
sent.append(sig)
return None
recorded = {"called": False}
monkeypatch.setattr(
AgentLauncher, "_record_kill",
staticmethod(lambda rid: recorded.__setitem__("called", True)),
)
monkeypatch.setattr("src.agents.launcher.os.kill", fake_kill)
launcher = AgentLauncher()
# must not raise
launcher._watchdog(pid=4242, run_id=3, timeout=0, agent="developer")
assert signal.SIGKILL not in sent
assert recorded["called"] is False

304
tests/test_queue.py Normal file
View File

@@ -0,0 +1,304 @@
"""Tests for ORCH-1 (F-2b) persistent job queue.
Covers:
- enqueue_job -> claim_next_job -> mark_job lifecycle
- claim_next_job atomicity (no double-dispatch of the same job)
- retry: fail -> requeue while attempts < max_attempts, then failed
- requeue_running_jobs (queue-recovery)
- count_running_jobs / job_status_counts / recent_jobs
- QueueWorker respects max_concurrency (Popen / launch fully mocked)
The real claude/Popen is NEVER spawned: launcher.launch_job is mocked in worker
tests, and the launcher finalize logic is exercised directly via mark_job.
"""
import os
import tempfile
import pytest
# Override env before importing app modules (same convention as test_qg.py).
_test_db = os.path.join(tempfile.gettempdir(), "test_orchestrator_queue.db")
os.environ["ORCH_DB_PATH"] = _test_db
os.environ["ORCH_REPOS_DIR"] = tempfile.gettempdir()
os.environ["ORCH_GITEA_TOKEN"] = "test-token"
os.environ["ORCH_PLANE_API_TOKEN"] = "test-token"
import src.db as db
from src.db import (
init_db,
enqueue_job,
claim_next_job,
mark_job,
count_running_jobs,
requeue_running_jobs,
get_job,
job_status_counts,
recent_jobs,
)
@pytest.fixture(autouse=True)
def fresh_db(tmp_path, monkeypatch):
"""Point the DB at a fresh per-test sqlite file and init the schema."""
dbfile = tmp_path / "queue.db"
monkeypatch.setattr(db.settings, "db_path", str(dbfile))
init_db()
yield
# ---------------------------------------------------------------------------
# enqueue / claim / mark lifecycle
# ---------------------------------------------------------------------------
class TestLifecycle:
def test_enqueue_creates_queued_job(self):
jid = enqueue_job("analyst", "enduro-trails", "task body", task_id=7)
job = get_job(jid)
assert job["status"] == "queued"
assert job["agent"] == "analyst"
assert job["repo"] == "enduro-trails"
assert job["task_content"] == "task body"
assert job["task_id"] == 7
assert job["attempts"] == 0
assert job["max_attempts"] == 2
def test_claim_marks_running_and_increments_attempts(self):
jid = enqueue_job("developer", "repo")
claimed = claim_next_job()
assert claimed is not None
assert claimed["id"] == jid
assert claimed["status"] == "running"
assert claimed["attempts"] == 1
assert count_running_jobs() == 1
def test_claim_empty_queue_returns_none(self):
assert claim_next_job() is None
def test_claim_is_fifo(self):
a = enqueue_job("analyst", "r")
b = enqueue_job("developer", "r")
assert claim_next_job()["id"] == a
assert claim_next_job()["id"] == b
def test_mark_done(self):
jid = enqueue_job("tester", "r")
claim_next_job()
mark_job(jid, "done", run_id=42)
job = get_job(jid)
assert job["status"] == "done"
assert job["run_id"] == 42
assert job["finished_at"] is not None
assert count_running_jobs() == 0
def test_mark_failed_records_error(self):
jid = enqueue_job("tester", "r")
claim_next_job()
mark_job(jid, "failed", run_id=9, error="boom")
job = get_job(jid)
assert job["status"] == "failed"
assert job["error"] == "boom"
assert job["finished_at"] is not None
# ---------------------------------------------------------------------------
# claim atomicity — no double dispatch
# ---------------------------------------------------------------------------
class TestClaimAtomicity:
def test_single_job_claimed_once(self):
jid = enqueue_job("analyst", "r")
first = claim_next_job()
second = claim_next_job()
assert first["id"] == jid
assert second is None # already running, not re-dispatched
def test_concurrent_claims_no_duplicate(self):
"""Many enqueued jobs claimed from parallel threads -> each claimed once."""
import threading
n = 20
for _ in range(n):
enqueue_job("developer", "r")
claimed_ids = []
lock = threading.Lock()
def grab():
while True:
job = claim_next_job()
if job is None:
return
with lock:
claimed_ids.append(job["id"])
threads = [threading.Thread(target=grab) for _ in range(8)]
for t in threads:
t.start()
for t in threads:
t.join()
assert len(claimed_ids) == n
assert len(set(claimed_ids)) == n # no id claimed twice
assert count_running_jobs() == n
# ---------------------------------------------------------------------------
# retry semantics (mirrors launcher._finalize_job logic)
# ---------------------------------------------------------------------------
class TestRetry:
def test_fail_requeues_while_under_max(self):
jid = enqueue_job("developer", "r", max_attempts=2)
job = claim_next_job() # attempts=1
assert job["attempts"] == 1
# attempts(1) < max(2) -> requeue
mark_job(jid, "queued", error="exit 1")
j = get_job(jid)
assert j["status"] == "queued"
assert j["error"] == "exit 1"
assert j["started_at"] is None # requeue clears started_at
def test_fail_fails_when_max_reached(self):
jid = enqueue_job("developer", "r", max_attempts=2)
claim_next_job() # attempts=1 -> requeue
mark_job(jid, "queued")
job2 = claim_next_job() # attempts=2
assert job2["attempts"] == 2
# attempts(2) >= max(2) -> failed
mark_job(jid, "failed", error="exit 1")
assert get_job(jid)["status"] == "failed"
def test_finalize_job_done(self):
"""launcher._finalize_job marks done on exit_code 0 (no Popen needed)."""
from src.agents.launcher import AgentLauncher
jid = enqueue_job("analyst", "r")
claim_next_job()
AgentLauncher()._finalize_job(jid, "analyst", run_id=5, exit_code=0)
assert get_job(jid)["status"] == "done"
def test_finalize_job_requeue_then_fail(self, monkeypatch):
from src.agents.launcher import AgentLauncher
# Silence telegram side-effect.
monkeypatch.setattr("src.notifications.send_telegram", lambda *a, **k: None)
lr = AgentLauncher()
jid = enqueue_job("developer", "r", max_attempts=2)
claim_next_job() # attempts=1
lr._finalize_job(jid, "developer", run_id=1, exit_code=2)
assert get_job(jid)["status"] == "queued" # 1 < 2 -> requeue
claim_next_job() # attempts=2
lr._finalize_job(jid, "developer", run_id=2, exit_code=2)
assert get_job(jid)["status"] == "failed" # 2 >= 2 -> failed
# ---------------------------------------------------------------------------
# queue-recovery
# ---------------------------------------------------------------------------
class TestRequeueRunning:
def test_requeue_running_jobs(self):
a = enqueue_job("analyst", "r")
b = enqueue_job("developer", "r")
claim_next_job() # a -> running
claim_next_job() # b -> running
assert count_running_jobs() == 2
n = requeue_running_jobs()
assert n == 2
assert count_running_jobs() == 0
assert get_job(a)["status"] == "queued"
assert get_job(b)["status"] == "queued"
def test_requeue_preserves_attempts(self):
jid = enqueue_job("analyst", "r")
claim_next_job() # attempts=1
requeue_running_jobs()
assert get_job(jid)["attempts"] == 1 # not reset
# ---------------------------------------------------------------------------
# observability helpers
# ---------------------------------------------------------------------------
class TestObservability:
def test_status_counts(self):
enqueue_job("analyst", "r") # stays queued
enqueue_job("developer", "r") # first claimed -> running (FIFO)
claim_next_job()
counts = job_status_counts()
assert counts["running"] == 1
assert counts["queued"] == 1
assert counts["done"] == 0
assert counts["failed"] == 0
def test_recent_jobs_desc(self):
ids = [enqueue_job("analyst", "r") for _ in range(3)]
recent = recent_jobs(10)
assert [r["id"] for r in recent] == sorted(ids, reverse=True)
# ---------------------------------------------------------------------------
# QueueWorker max_concurrency (launch_job fully mocked — no real Popen)
# ---------------------------------------------------------------------------
class TestWorkerConcurrency:
@pytest.fixture(autouse=True)
def _ok_preflight(self, monkeypatch):
# ORCH-1 resilience: the worker gates claims behind preflight; in tests there
# is no claude binary, so stub preflight OK to exercise pure queue/concurrency.
monkeypatch.setattr("src.queue_worker.preflight.check", lambda *a, **k: (True, "ok"))
def test_worker_respects_max_concurrency(self, monkeypatch):
from src.queue_worker import QueueWorker
launched = []
def fake_launch_job(job):
# Simulate a long-running agent: the job stays 'running' (we do NOT
# mark it done), so the slot remains occupied.
launched.append(job["id"])
return 100 + job["id"]
monkeypatch.setattr("src.queue_worker.launcher.launch_job", fake_launch_job)
for _ in range(5):
enqueue_job("developer", "r")
w = QueueWorker(max_concurrency=2, poll_interval=0.01)
w._drain_once()
# Only max_concurrency jobs may be launched / running at once.
assert len(launched) == 2
assert count_running_jobs() == 2
def test_worker_drains_as_slots_free(self, monkeypatch):
from src.queue_worker import QueueWorker
def fake_launch_job(job):
# Immediately complete the job so the slot frees for the next claim.
mark_job(job["id"], "done", run_id=job["id"])
return job["id"]
monkeypatch.setattr("src.queue_worker.launcher.launch_job", fake_launch_job)
for _ in range(4):
enqueue_job("analyst", "r")
w = QueueWorker(max_concurrency=1, poll_interval=0.01)
w._drain_once()
# With instant completion and concurrency 1, one drain pass empties the queue.
assert job_status_counts()["done"] == 4
assert count_running_jobs() == 0
def test_worker_launch_failure_does_not_wedge_slot(self, monkeypatch):
from src.queue_worker import QueueWorker
def boom(job):
raise RuntimeError("repo missing")
monkeypatch.setattr("src.queue_worker.launcher.launch_job", boom)
monkeypatch.setattr("src.notifications.send_telegram", lambda *a, **k: None)
enqueue_job("developer", "r", max_attempts=1)
w = QueueWorker(max_concurrency=1, poll_interval=0.01)
w._drain_once()
# attempts=1 >= max_attempts=1 -> failed, not stuck running.
assert count_running_jobs() == 0
counts = job_status_counts()
assert counts["failed"] == 1

295
tests/test_resilience.py Normal file
View File

@@ -0,0 +1,295 @@
"""ORCH-1 resilience tests: preflight, 429-classifier, backoff, circuit breaker.
No real claude/Popen is ever spawned: preflight subprocess and launcher.launch_job
are mocked. DB is a fresh per-test sqlite file.
"""
import os
import tempfile
import pytest
_test_db = os.path.join(tempfile.gettempdir(), "test_orchestrator_resilience.db")
os.environ["ORCH_DB_PATH"] = _test_db
os.environ["ORCH_REPOS_DIR"] = tempfile.gettempdir()
os.environ["ORCH_GITEA_TOKEN"] = "test-token"
os.environ["ORCH_PLANE_API_TOKEN"] = "test-token"
import src.db as db
from src.db import (
init_db, enqueue_job, claim_next_job, get_job, count_running_jobs,
mark_job_transient,
)
from src import preflight, error_classifier
from src.error_classifier import classify_text, parse_retry_after, classify_log_file
from src.queue_worker import QueueWorker, CircuitBreaker
from src.agents.launcher import AgentLauncher
@pytest.fixture(autouse=True)
def fresh_db(tmp_path, monkeypatch):
monkeypatch.setattr(db.settings, "db_path", str(tmp_path / "res.db"))
init_db()
preflight.reset_cache()
yield
# ---------------------------------------------------------------------------
# A. Preflight
# ---------------------------------------------------------------------------
class TestPreflight:
def test_fail_when_bin_missing(self, monkeypatch):
monkeypatch.setattr(preflight, "_claude_bin", lambda: "/no/such/claude")
ok, reason = preflight.check(force=True)
assert ok is False
assert "not found" in reason.lower()
def test_ok_when_version_succeeds(self, monkeypatch, tmp_path):
fake_bin = tmp_path / "claude"
fake_bin.write_text("#!/bin/sh\necho v1\n")
monkeypatch.setattr(preflight, "_claude_bin", lambda: str(fake_bin))
monkeypatch.setattr(preflight, "_run_version", lambda b: (True, "1.2.3"))
ok, reason = preflight.check(force=True)
assert ok is True
def test_cache_does_not_recheck_within_ttl(self, monkeypatch, tmp_path):
fake_bin = tmp_path / "claude"
fake_bin.write_text("x")
monkeypatch.setattr(preflight, "_claude_bin", lambda: str(fake_bin))
monkeypatch.setattr(db.settings, "preflight_cache_ttl", 999)
calls = {"n": 0}
def counting_version(b):
calls["n"] += 1
return True, "ok"
monkeypatch.setattr(preflight, "_run_version", counting_version)
preflight.reset_cache()
preflight.check() # first -> runs version
preflight.check() # cached -> no extra version call
preflight.check()
assert calls["n"] == 1
def test_force_bypasses_cache(self, monkeypatch, tmp_path):
fake_bin = tmp_path / "claude"
fake_bin.write_text("x")
monkeypatch.setattr(preflight, "_claude_bin", lambda: str(fake_bin))
calls = {"n": 0}
monkeypatch.setattr(preflight, "_run_version",
lambda b: (calls.__setitem__("n", calls["n"] + 1), (True, "ok"))[1])
preflight.reset_cache()
preflight.check()
preflight.check(force=True)
assert calls["n"] == 2
def test_worker_does_not_claim_when_preflight_fails(self, monkeypatch):
# Preflight FAIL -> job stays queued, launch_job never called.
monkeypatch.setattr("src.queue_worker.preflight.check",
lambda *a, **k: (False, "down"))
called = {"launch": False}
monkeypatch.setattr("src.queue_worker.launcher.launch_job",
lambda job: called.__setitem__("launch", True))
jid = enqueue_job("analyst", "r")
QueueWorker(max_concurrency=1, poll_interval=0.01)._drain_once()
assert called["launch"] is False
assert get_job(jid)["status"] == "queued"
assert count_running_jobs() == 0
# ---------------------------------------------------------------------------
# B. Error classifier
# ---------------------------------------------------------------------------
class TestClassifier:
@pytest.mark.parametrize("text", [
"Error: 429 Too Many Requests",
"anthropic rate limit exceeded",
"overloaded_error: server is overloaded",
"API quota exhausted",
"503 Service Unavailable",
"connection reset by peer",
])
def test_transient_patterns(self, text):
assert classify_text(text) == "transient"
@pytest.mark.parametrize("text", [
"Traceback: KeyError 'foo'",
"SyntaxError: invalid syntax",
"assertion failed in test",
"",
])
def test_permanent_patterns(self, text):
assert classify_text(text) == "permanent"
def test_retry_after_header(self):
assert parse_retry_after("HTTP/1.1 429\nRetry-After: 42\n") == 42
def test_retry_after_json(self):
assert parse_retry_after('{"error":{"type":"rate_limit","retry_after": 7}}') == 7
def test_retry_after_absent(self):
assert parse_retry_after("just an error") is None
def test_classify_log_file(self, tmp_path):
p = tmp_path / "run.log"
p.write_text("...lots of output...\n429 rate limit. Retry-After: 30\n")
kind, ra = classify_log_file(str(p))
assert kind == "transient"
assert ra == 30
def test_classify_missing_file_is_permanent(self):
kind, ra = classify_log_file("/no/such/log")
assert kind == "permanent"
assert ra is None
# ---------------------------------------------------------------------------
# C. Backoff + available_at gating
# ---------------------------------------------------------------------------
class TestBackoff:
def test_backoff_grows_exponentially(self):
lr = AgentLauncher()
# base=10, cap=600 (defaults)
b1 = lr._backoff_seconds(1)
b2 = lr._backoff_seconds(2)
b3 = lr._backoff_seconds(3)
assert b1 == 20 # 2^1*10
assert b2 == 40 # 2^2*10
assert b3 == 80 # 2^3*10
assert b2 > b1 and b3 > b2
def test_backoff_capped(self):
lr = AgentLauncher()
assert lr._backoff_seconds(20) == 600 # capped at backoff_max_seconds
def test_retry_after_respected_when_larger(self):
lr = AgentLauncher()
# transient_attempts=1 -> base backoff 20; Retry-After=120 wins.
assert lr._backoff_seconds(1, retry_after=120) == 120
def test_retry_after_ignored_when_smaller(self):
lr = AgentLauncher()
assert lr._backoff_seconds(3, retry_after=5) == 80 # backoff bigger
def test_transient_requeue_sets_future_available_at_and_claim_skips(self):
jid = enqueue_job("developer", "r")
claim_next_job()
# Big backoff -> available_at far in the future.
mark_job_transient(jid, 3600, error="429")
job = get_job(jid)
assert job["status"] == "queued"
assert job["transient_attempts"] == 1
assert job["available_at"] is not None
# claim must NOT pick it up while available_at is in the future.
assert claim_next_job() is None
def test_transient_requeue_claimable_when_due(self):
jid = enqueue_job("developer", "r")
claim_next_job()
mark_job_transient(jid, -5, error="429") # available_at in the past
c = claim_next_job()
assert c is not None and c["id"] == jid
# ---------------------------------------------------------------------------
# D. Launcher transient/permanent finalize (no Popen)
# ---------------------------------------------------------------------------
class TestFinalizeClassified:
def test_transient_failure_backoff_requeue(self, tmp_path, monkeypatch):
monkeypatch.setattr("src.notifications.send_telegram", lambda *a, **k: None)
log = tmp_path / "1.log"
log.write_text("Error 429 rate limit exceeded\n")
jid = enqueue_job("developer", "r", max_attempts=2)
claim_next_job()
AgentLauncher()._finalize_job(jid, "developer", run_id=1, exit_code=1,
output_path=str(log))
job = get_job(jid)
assert job["status"] == "queued"
assert job["transient_attempts"] == 1
assert job["available_at"] is not None # backoff-gated
assert job["attempts"] == 1 # code-fault budget NOT burned
def test_permanent_failure_uses_normal_attempts(self, tmp_path, monkeypatch):
monkeypatch.setattr("src.notifications.send_telegram", lambda *a, **k: None)
log = tmp_path / "2.log"
log.write_text("Traceback: ValueError\n")
jid = enqueue_job("developer", "r", max_attempts=2)
claim_next_job()
AgentLauncher()._finalize_job(jid, "developer", run_id=2, exit_code=1,
output_path=str(log))
job = get_job(jid)
assert job["status"] == "queued"
assert job["transient_attempts"] == 0 # not transient
assert job["available_at"] is None # no backoff for code-fault
def test_transient_exhausts_to_failed(self, tmp_path, monkeypatch):
monkeypatch.setattr("src.notifications.send_telegram", lambda *a, **k: None)
monkeypatch.setattr(db.settings, "transient_max_attempts", 2)
log = tmp_path / "3.log"
log.write_text("overloaded_error\n")
lr = AgentLauncher()
jid = enqueue_job("developer", "r")
claim_next_job()
lr._finalize_job(jid, "developer", 1, exit_code=1, output_path=str(log))
assert get_job(jid)["status"] == "queued" # transient 1 -> requeue
# force claimable and retry
mark_job_transient(jid, -1) # makes it due; transient=2 now
claim_next_job()
lr._finalize_job(jid, "developer", 2, exit_code=1, output_path=str(log))
assert get_job(jid)["status"] == "failed" # transient budget exhausted
# ---------------------------------------------------------------------------
# E. Circuit breaker
# ---------------------------------------------------------------------------
class TestCircuitBreaker:
def test_opens_after_threshold(self):
cb = CircuitBreaker(threshold=3, pause_seconds=300)
assert cb.allow_claim() is True
cb.record_transient()
cb.record_transient()
assert cb.state == "closed"
cb.record_transient() # 3rd -> open
assert cb.state == "open"
assert cb.allow_claim() is False # paused, no CLI calls
def test_recovered_resets_streak(self):
cb = CircuitBreaker(threshold=3)
cb.record_transient()
cb.record_transient()
cb.record_recovered()
assert cb.consecutive_transient == 0
assert cb.state == "closed"
def test_half_open_after_pause_then_closed_on_success(self, monkeypatch):
cb = CircuitBreaker(threshold=2, pause_seconds=300)
cb.record_transient()
cb.record_transient() # open
assert cb.state == "open"
# Simulate the pause elapsing.
cb.opened_at -= 301
assert cb.allow_claim() is True # -> half-open (probe)
assert cb.state == "half-open"
cb.record_recovered() # probe succeeded
assert cb.state == "closed"
def test_half_open_reopens_on_transient(self):
cb = CircuitBreaker(threshold=2, pause_seconds=300)
cb.record_transient(); cb.record_transient() # open
cb.opened_at -= 301
cb.allow_claim() # half-open
assert cb.state == "half-open"
cb.record_transient() # probe failed -> re-open
assert cb.state == "open"
def test_breaker_blocks_worker_claim(self, monkeypatch):
monkeypatch.setattr("src.queue_worker.preflight.check",
lambda *a, **k: (True, "ok"))
called = {"launch": False}
monkeypatch.setattr("src.queue_worker.launcher.launch_job",
lambda job: called.__setitem__("launch", True))
cb = CircuitBreaker(threshold=1, pause_seconds=300)
cb.record_transient() # open immediately
w = QueueWorker(max_concurrency=1, poll_interval=0.01, breaker=cb)
enqueue_job("analyst", "r")
w._drain_once()
assert called["launch"] is False # breaker open -> no claim, no CLI

395
tests/test_stage_engine.py Normal file
View File

@@ -0,0 +1,395 @@
"""ORCH-4 / M-3: tests for the unified stage engine (src/stage_engine.advance_stage).
These verify the MERGED behavior of what used to be two diverged
_try_advance_stage implementations (launcher sync + plane async):
* happy-path advance for every stage launches the CORRECT agent
(the ORCH-4 fix: agent = get_agent_for_stage(current_stage), NOT next_stage);
* a QG failure does not advance;
* reviewer REQUEST_CHANGES -> rollback to development + enqueue developer;
* developer retries > 3 -> telegram alert, no further enqueue;
* tester FAIL -> rollback to development + enqueue developer;
* architect conflict (10-conflict.md) -> rollback to analysis + enqueue analyst;
* launcher AND plane both delegate to the engine.
Network/Plane/Telegram side effects are mocked at the src.stage_engine level so
the engine runs against a real isolated sqlite DB.
"""
import os
import tempfile
import pytest
# Isolated test DB (same convention as the other suites).
_test_db = os.path.join(tempfile.gettempdir(), "test_orchestrator_stage_engine.db")
os.environ["ORCH_DB_PATH"] = _test_db
os.environ["ORCH_REPOS_DIR"] = tempfile.gettempdir()
os.environ.setdefault("ORCH_GITEA_TOKEN", "test-token")
os.environ.setdefault("ORCH_PLANE_API_TOKEN", "test-token")
from unittest.mock import MagicMock, patch # noqa: E402
import src.db as _db # noqa: E402
from src.db import init_db, get_db # noqa: E402
from src import stage_engine # noqa: E402
from src.stage_engine import advance_stage # noqa: E402
from src.stages import get_agent_for_stage # noqa: E402
# ---------------------------------------------------------------------------
# Fixtures
# ---------------------------------------------------------------------------
@pytest.fixture(autouse=True)
def fresh_db(monkeypatch):
"""Fresh isolated DB per test."""
monkeypatch.setattr(_db.settings, "db_path", _test_db)
if os.path.exists(_test_db):
os.unlink(_test_db)
init_db()
yield
@pytest.fixture(autouse=True)
def silence_side_effects(monkeypatch):
"""Mock all Plane/Telegram/notification side effects in the engine.
Everything imported into src.stage_engine that touches the network or sends
a message becomes a no-op MagicMock so tests are deterministic and offline.
"""
for name in (
"notify_stage_change",
"notify_qg_failure",
"notify_approve_requested",
"send_telegram",
"plane_notify_stage",
"plane_notify_qg",
"plane_add_comment",
"set_issue_in_review",
"set_issue_needs_input",
"set_issue_in_progress",
"set_issue_blocked",
):
monkeypatch.setattr(stage_engine, name, MagicMock())
def _make_task(stage, repo="enduro-trails", branch="feature/ET-001-x", wi="ET-001"):
conn = get_db()
cur = conn.execute(
"INSERT INTO tasks (plane_id, work_item_id, repo, branch, stage) "
"VALUES (?, ?, ?, ?, ?)",
(f"plane-{wi}", wi, repo, branch, stage),
)
task_id = cur.lastrowid
conn.commit()
conn.close()
return task_id
def _stage(task_id):
conn = get_db()
row = conn.execute("SELECT stage FROM tasks WHERE id=?", (task_id,)).fetchone()
conn.close()
return row[0]
def _jobs():
conn = get_db()
rows = conn.execute("SELECT agent, repo, task_id FROM jobs ORDER BY id").fetchall()
conn.close()
return [dict(r) for r in rows]
def _add_developer_runs(task_id, n):
conn = get_db()
for _ in range(n):
conn.execute(
"INSERT INTO agent_runs (task_id, agent) VALUES (?, 'developer')",
(task_id,),
)
conn.commit()
conn.close()
def _pass(*a, **k):
return (True, "ok")
def _fail(reason):
def _f(*a, **k):
return (False, reason)
return _f
# ---------------------------------------------------------------------------
# Happy path: each stage advances and launches the CORRECT agent (ORCH-4 fix)
# ---------------------------------------------------------------------------
class TestHappyPathAgentSelection:
"""The fixed agent-selection: when advancing FROM current_stage, the engine
must enqueue get_agent_for_stage(current_stage), NOT next_stage.
"""
@pytest.mark.parametrize(
"current_stage,expected_next,expected_agent",
[
("architecture", "development", "developer"),
("development", "review", "reviewer"),
("review", "testing", "tester"),
("testing", "deploy", "deployer"),
],
)
def test_advance_launches_current_stage_agent(
self, monkeypatch, current_stage, expected_next, expected_agent
):
# All QG checks pass for this happy-path suite.
monkeypatch.setattr(
stage_engine, "QG_CHECKS",
{k: _pass for k in stage_engine.QG_CHECKS},
)
task_id = _make_task(current_stage)
res = advance_stage(
task_id, current_stage, "enduro-trails", "ET-001",
"feature/ET-001-x", finished_agent=None,
)
assert res.advanced is True
assert res.to_stage == expected_next
assert _stage(task_id) == expected_next
# The ORCH-4 fix: correct agent == get_agent_for_stage(current_stage).
assert expected_agent == get_agent_for_stage(current_stage)
assert res.enqueued_agent == expected_agent
jobs = _jobs()
assert len(jobs) == 1
assert jobs[0]["agent"] == expected_agent
def test_deploy_to_done_no_agent(self, monkeypatch):
"""deploy -> done advances but launches no agent (terminal-ish)."""
monkeypatch.setattr(
stage_engine, "QG_CHECKS",
{k: _pass for k in stage_engine.QG_CHECKS},
)
task_id = _make_task("deploy")
res = advance_stage(task_id, "deploy", "enduro-trails", "ET-001",
"feature/ET-001-x", finished_agent=None)
assert res.advanced is True
assert _stage(task_id) == "done"
assert res.enqueued_agent is None
assert _jobs() == []
def test_done_is_terminal(self):
task_id = _make_task("done")
res = advance_stage(task_id, "done", "enduro-trails", "ET-001",
"feature/ET-001-x", finished_agent=None)
assert res.advanced is False
assert _stage(task_id) == "done"
# ---------------------------------------------------------------------------
# QG failure: do not advance
# ---------------------------------------------------------------------------
class TestQgFailureDoesNotAdvance:
def test_qg_fail_keeps_stage(self, monkeypatch):
monkeypatch.setattr(
stage_engine, "QG_CHECKS",
{**stage_engine.QG_CHECKS, "check_architecture_done": _fail("not done")},
)
task_id = _make_task("architecture")
res = advance_stage(task_id, "architecture", "enduro-trails", "ET-001",
"feature/ET-001-x", finished_agent="architect")
assert res.advanced is False
assert res.qg_passed is False
assert _stage(task_id) == "architecture"
assert _jobs() == []
def test_webhook_path_emits_qg_failure_notification(self, monkeypatch):
"""finished_agent=None -> generic QG-failure notification fires (plane parity)."""
monkeypatch.setattr(
stage_engine, "QG_CHECKS",
{**stage_engine.QG_CHECKS, "check_tests_local": _fail("ci red")},
)
task_id = _make_task("development")
advance_stage(task_id, "development", "enduro-trails", "ET-001",
"feature/ET-001-x", finished_agent=None)
assert stage_engine.notify_qg_failure.called
assert stage_engine.plane_notify_qg.called
def test_launcher_path_no_generic_qg_notification(self, monkeypatch):
"""finished_agent set -> NO generic QG notification (launcher parity)."""
monkeypatch.setattr(
stage_engine, "QG_CHECKS",
{**stage_engine.QG_CHECKS, "check_architecture_done": _fail("not done")},
)
task_id = _make_task("architecture")
advance_stage(task_id, "architecture", "enduro-trails", "ET-001",
"feature/ET-001-x", finished_agent="architect")
assert not stage_engine.notify_qg_failure.called
# ---------------------------------------------------------------------------
# Reviewer REQUEST_CHANGES -> rollback to development + enqueue developer
# ---------------------------------------------------------------------------
class TestReviewerRequestChanges:
def test_rollback_and_enqueue_developer(self, monkeypatch):
monkeypatch.setattr(
stage_engine, "QG_CHECKS",
{**stage_engine.QG_CHECKS,
"check_reviewer_verdict": _fail("verdict: REQUEST_CHANGES")},
)
task_id = _make_task("review")
res = advance_stage(task_id, "review", "enduro-trails", "ET-001",
"feature/ET-001-x", finished_agent="reviewer")
assert res.advanced is False
assert res.rolled_back_to == "development"
assert _stage(task_id) == "development"
jobs = _jobs()
assert len(jobs) == 1
assert jobs[0]["agent"] == "developer"
def test_retry_over_3_alerts_no_enqueue(self, monkeypatch):
monkeypatch.setattr(
stage_engine, "QG_CHECKS",
{**stage_engine.QG_CHECKS,
"check_reviewer_verdict": _fail("verdict: REQUEST_CHANGES")},
)
task_id = _make_task("review")
_add_developer_runs(task_id, 3) # already at the max
res = advance_stage(task_id, "review", "enduro-trails", "ET-001",
"feature/ET-001-x", finished_agent="reviewer")
assert res.rolled_back_to == "development"
assert res.alerted is True
assert stage_engine.send_telegram.called
# No new developer job enqueued past the retry cap.
assert _jobs() == []
# ---------------------------------------------------------------------------
# Tester FAIL -> rollback to development + enqueue developer
# ---------------------------------------------------------------------------
class TestTesterFail:
def test_rollback_and_enqueue_developer(self, monkeypatch):
monkeypatch.setattr(
stage_engine, "QG_CHECKS",
{**stage_engine.QG_CHECKS, "check_tests_passed": _fail("2 tests failed")},
)
task_id = _make_task("testing")
res = advance_stage(task_id, "testing", "enduro-trails", "ET-001",
"feature/ET-001-x", finished_agent="tester")
assert res.advanced is False
assert res.rolled_back_to == "development"
assert _stage(task_id) == "development"
jobs = _jobs()
assert len(jobs) == 1
assert jobs[0]["agent"] == "developer"
def test_retry_over_3_blocks_and_alerts(self, monkeypatch):
monkeypatch.setattr(
stage_engine, "QG_CHECKS",
{**stage_engine.QG_CHECKS, "check_tests_passed": _fail("still failing")},
)
task_id = _make_task("testing")
_add_developer_runs(task_id, 3)
res = advance_stage(task_id, "testing", "enduro-trails", "ET-001",
"feature/ET-001-x", finished_agent="tester")
assert res.rolled_back_to == "development"
assert res.alerted is True
assert stage_engine.set_issue_blocked.called
assert _jobs() == []
# ---------------------------------------------------------------------------
# Architect conflict -> rollback to analysis + enqueue analyst
# ---------------------------------------------------------------------------
class TestArchitectConflict:
def test_conflict_rolls_back_to_analysis(self, monkeypatch, tmp_path):
# 10-conflict.md must exist in the worktree path the engine inspects.
wt = tmp_path / "wt"
conflict_dir = wt / "docs" / "work-items" / "ET-001"
conflict_dir.mkdir(parents=True)
(conflict_dir / "10-conflict.md").write_text("conflict with TRZ")
monkeypatch.setattr(stage_engine, "get_worktree_path", lambda repo, branch: str(wt))
monkeypatch.setattr(
stage_engine, "QG_CHECKS",
{**stage_engine.QG_CHECKS, "check_architecture_done": _fail("conflict")},
)
task_id = _make_task("architecture")
res = advance_stage(task_id, "architecture", "enduro-trails", "ET-001",
"feature/ET-001-x", finished_agent="architect")
assert res.advanced is False
assert res.rolled_back_to == "analysis"
assert _stage(task_id) == "analysis"
jobs = _jobs()
assert len(jobs) == 1
assert jobs[0]["agent"] == "analyst"
def test_no_conflict_file_no_rollback(self, monkeypatch, tmp_path):
wt = tmp_path / "wt"
(wt / "docs").mkdir(parents=True)
monkeypatch.setattr(stage_engine, "get_worktree_path", lambda repo, branch: str(wt))
monkeypatch.setattr(
stage_engine, "QG_CHECKS",
{**stage_engine.QG_CHECKS, "check_architecture_done": _fail("incomplete")},
)
task_id = _make_task("architecture")
res = advance_stage(task_id, "architecture", "enduro-trails", "ET-001",
"feature/ET-001-x", finished_agent="architect")
assert res.advanced is False
assert res.rolled_back_to is None
assert _stage(task_id) == "architecture"
assert _jobs() == []
# ---------------------------------------------------------------------------
# Analyst approved-flow (analysis gate): never auto-advances
# ---------------------------------------------------------------------------
class TestAnalysisApprovedFlow:
def test_artifacts_ready_requests_approval_no_advance(self, monkeypatch):
monkeypatch.setattr(
stage_engine, "QG_CHECKS",
{**stage_engine.QG_CHECKS, "check_analysis_complete": _pass},
)
task_id = _make_task("analysis")
res = advance_stage(task_id, "analysis", "enduro-trails", "ET-001",
"feature/ET-001-x", finished_agent="analyst")
assert res.advanced is False
assert _stage(task_id) == "analysis"
assert stage_engine.set_issue_in_review.called
assert stage_engine.notify_approve_requested.called
assert _jobs() == []
# ---------------------------------------------------------------------------
# launcher + plane both delegate to the engine
# ---------------------------------------------------------------------------
class TestDelegation:
def test_launcher_calls_engine(self):
from src.agents.launcher import AgentLauncher
task_id = _make_task("development", branch="feature/ET-777-deleg")
with patch("src.stage_engine.advance_stage") as m:
AgentLauncher()._try_advance_stage(
run_id=1, agent="developer", repo="enduro-trails",
branch="feature/ET-777-deleg",
)
m.assert_called_once()
kwargs = m.call_args.kwargs
assert kwargs["task_id"] == task_id
assert kwargs["current_stage"] == "development"
assert kwargs["finished_agent"] == "developer"
def test_plane_calls_engine(self):
import asyncio
from src.webhooks import plane as plane_mod
with patch("src.stage_engine.advance_stage") as m:
asyncio.run(
plane_mod._try_advance_stage(
task_id=5, current_stage="analysis", repo="enduro-trails",
work_item_id="ET-001", branch="feature/ET-001-x",
)
)
m.assert_called_once()
# plane passes positional args; finished_agent (last positional) is None.
args = m.call_args.args
assert args[0] == 5
assert args[1] == "analysis"
assert args[-1] is None