Merge pull request 'ORCH-1 (F-2b): persistent job queue instead of in-process daemon threads' (#3) from feature/ORCH-1-job-queue into main

This commit was merged in pull request #3.
This commit is contained in:
2026-06-03 08:09:23 +03:00
14 changed files with 1785 additions and 35 deletions

View File

@@ -39,6 +39,7 @@ created → analysis → architecture → development → review → testing →
|--------|------|----------|
| GET | `/health` | Health check |
| GET | `/status` | Активные задачи (stage != done) |
| GET | `/queue` | Очередь задач (ORCH-1): counts по статусам + max_concurrency + последние 10 jobs |
| POST | `/webhook/plane` | Plane webhook receiver |
| POST | `/webhook/gitea` | Gitea webhook receiver |
@@ -52,8 +53,9 @@ src/
├── stages.py # State machine (transitions, agents, QG)
├── notifications.py # Уведомления (логирование)
├── plane_sync.py # Синхронизация статусов с Plane API
├── queue_worker.py # ORCH-1: фоновый воркер очереди (claim → launch_job)
├── agents/
│ └── launcher.py # AgentLauncher: launch, monitor, watchdog, auto-advance
│ └── launcher.py # AgentLauncher: launch/launch_job, monitor, watchdog, auto-advance
├── webhooks/
│ ├── plane.py # Plane webhook handler
│ └── gitea.py # Gitea webhook handler (push, PR, CI status)
@@ -107,6 +109,36 @@ uvicorn src.main:app --reload --port 8500
| `ORCH_REPOS_DIR` | Repos dir (container) | `/repos` |
| `ORCH_HOST_REPOS_DIR` | Repos dir (host) | `/home/slin/repos` |
| `ORCH_DB_PATH` | SQLite path | `/app/data/orchestrator.db` |
| `ORCH_MAX_CONCURRENCY` | Сколько jobs воркер запускает параллельно (ORCH-1) | `1` |
| `ORCH_QUEUE_POLL_INTERVAL` | Период опроса очереди воркером, сек (ORCH-1) | `2.0` |
| `ORCH_PREFLIGHT_CACHE_TTL` | Кэш preflight (CLI/net), сек (ORCH-1 resilience) | `45` |
| `ORCH_BACKOFF_BASE_SECONDS` | База exp-backoff для transient (429) | `10` |
| `ORCH_BACKOFF_MAX_SECONDS` | Потолок backoff | `600` |
| `ORCH_TRANSIENT_MAX_ATTEMPTS` | Ретраи для 429/недоступности | `5` |
| `ORCH_BREAKER_THRESHOLD` | transient подряд до открытия breaker | `3` |
| `ORCH_BREAKER_PAUSE_SECONDS` | Пауза при открытом breaker | `300` |
## Очередь задач (ORCH-1 / F-2b)
Webhook-хэндлеры больше не спавнят claude-агентов синхронно в процессе uvicorn.
Вместо этого они кладут **job** в персистентную SQLite-таблицу `jobs`
(`enqueue_job`, мгновенный ответ), а фоновый воркер (`src/queue_worker.py`)
забирает jobs с учётом `ORCH_MAX_CONCURRENCY` и запускает агента (`launch_job`,
та же Popen-логика, что и раньше).
Преимущества:
- **Рестарт-safe.** При старте jobs со статусом `running` возвращаются в `queued`
(queue-recovery в lifespan) — работа не теряется.
- **Лимит параллелизма.** Воркер не превышает `ORCH_MAX_CONCURRENCY`.
- **Ретраи.** Упавший job (exit≠0) ретраится пока `attempts < max_attempts`,
потом `failed` + Telegram-нотификация.
Статусы job: `queued → running → done | failed`. Наблюдаемость — через `GET /queue`.
**Resilience-слой:** дешёвый preflight (CLI/net, кэш, без токенов) гейтит claim;
429/overload детектится по логу (transient vs permanent), transient ретраится с
exp-backoff (`available_at`, Retry-After); circuit breaker паузит воркер после N
transient подряд. Подробности: `docs/ORCH-1_JOB_QUEUE.md`.
## Multi-repo: реестр проектов (ORCH-6)

View File

@@ -264,9 +264,71 @@ services:
- ~~Shared `/repos` checkout (гонки при параллельных задачах).~~ **РЕШЕНО (ORCH-2 / S-4):**
git worktree per task/branch — см. раздел «Изоляция через git worktree» ниже.
- **In-process daemon-потоки.** Агенты запускаются в daemon-потоках uvicorn. При
рестарте uvicorn запущенные агенты осиротевают → ловит orphan-recovery (M-1).
Целевая архитектура — очередь задач (F-2b, отдельно).
- ~~In-process daemon-потоки (рестарт → сироты, потеря работы).~~ **РЕШЕНО (ORCH-1 / F-2b):**
персистентная очередь jobs + фоновый воркер — см. раздел «Очередь задач (ORCH-1)» ниже.
Daemon-потоки monitor/watchdog остаются для одного запущенного агента, но при
рестарте его job возвращается в `queued` (queue-recovery) и переподхватывается.
## Очередь задач (ORCH-1 / F-2b)
Раньше webhook-хэндлер **синхронно** спавнил `subprocess.Popen` + 2 daemon-thread
прямо в процессе uvicorn (8 точек вызова). Рестарт = сироты + потеря работы,
нет лимита параллелизма, нет ретраев.
### Flow
```
webhook (plane/gitea) background thread (queue_worker)
│ │
enqueue_job() ---> [ jobs table ] <--- claim_next_job() (atomic queued->running)
(мгновенный status=queued │
ответ 200) launch_job(job)
AgentLauncher._spawn (Popen claude)
_monitor_agent (proc.wait, commit/push,
│ advance stage)
_finalize_job:
exit 0 -> mark_job done
exit !=0 & attempts<max -> requeue (queued)
exit !=0 & attempts>=max -> failed + Telegram
```
### Таблица `jobs`
| Колонка | Назначение |
|--------|------------|
| `status` | `queued``running``done` \| `failed` |
| `attempts` / `max_attempts` | счётчик попыток (инкремент при claim) / лимит ретраев (default 2) |
| `run_id` | FK на `agent_runs.id` после старта |
| `task_content` | ТЗ, которое пишется в task-файл агента |
| `error` | последняя ошибка |
`idx_jobs_status (status, id)` — быстрый FIFO-выбор queued.
### Атомарный claim
`claim_next_job()` делает `SELECT queued ORDER BY id LIMIT 1``UPDATE ... WHERE id=? AND
status='queued'` и проверяет `rowcount`. При гонке двух тиков лишь один UPDATE
переведёт строку в `running` (rowcount==1); проигравший берёт следующий job.
### Queue-recovery (рестарт-safe)
В `main.py` lifespan **после** M-1 orphan-recovery вызывается `requeue_running_jobs()`:
jobs со статусом `running` (воркер умёр на рестарте) → возвращаются в `queued`.
Потом стартует воркер; на shutdown — `worker.stop()` (Event.set + join).
### Конфиг
- `ORCH_MAX_CONCURRENCY` (default 1) — лимит параллельных jobs.
- `ORCH_QUEUE_POLL_INTERVAL` (default 2.0) — период опроса.
Наблюдаемость: `GET /queue` — counts по статусам + последние 10 jobs.
> Совместимость: `launcher.launch()` (прямой синхронный запуск, `job_id=None`)
> сохранён для обратной совместимости. Очередь использует `launch_job()`;
> оба разделяют `_spawn()` (Popen-логика B-2 не изменена).
- **Gitea CI не настроен.** QG развития теперь локальный (`check_tests_local`);
Gitea CI-статусы не являются authoritative и не блокируют pipeline.
- **Docker внутри контейнера orchestrator НЕДОСТУПЕН.** Деплой идёт только через

127
docs/ORCH-1_JOB_QUEUE.md Normal file
View File

@@ -0,0 +1,127 @@
# ORCH-1 (F-2b): Persistent Job Queue
**Дата:** 2026-06-02
**Ветка:** `feature/ORCH-1-job-queue`
**Источник:** AUDIT_2026-06-02 (B-2 / F-2b)
## Проблема
Агенты запускались **in-process**: `launcher.launch()` синхронно спавнил
`subprocess.Popen` + 2 daemon-thread (`_watchdog`, `_monitor_agent`) прямо в
процессе uvicorn, из **8 webhook-точек**. Последствия:
- **Рестарт = катастрофа.** daemon-threads умирают, claude-процессы → сироты,
работа теряется (M-1 лишь помечал `exit=-1` и звал человека).
- **Нет лимита параллелизма** — N webhook'ов = N одновременных claude.
- **Нет ретраев** — упавший агент просто мёртв.
## Решение
Персистентная очередь задач (SQLite-таблица `jobs`) + фоновый воркер:
1. Webhook-хэндлер кладёт job (`enqueue_job`) → мгновенный ответ 200.
2. Фоновый воркер (`src/queue_worker.py`, отдельный daemon-thread) забирает
jobs с учётом `max_concurrency` (`claim_next_job`, атомарно) и спавнит агента
(`launcher.launch_job`, та же Popen-логика).
3. По завершении `_monitor_agent``_finalize_job`:
- `exit 0``done`;
- `exit != 0` & `attempts < max_attempts` → requeue (`queued`);
- `exit != 0` & `attempts >= max_attempts``failed` + Telegram.
## Что изменено
| Файл | Изменение |
|------|-----------|
| `src/db.py` | Таблица `jobs` + индекс; хелперы `enqueue_job`, `claim_next_job` (атомарный), `mark_job`, `count_running_jobs`, `requeue_running_jobs`, `get_job`, `job_status_counts`, `recent_jobs` |
| `src/config.py` | `max_concurrency` (env `ORCH_MAX_CONCURRENCY`, default 1), `queue_poll_interval` (env `ORCH_QUEUE_POLL_INTERVAL`, default 2.0) |
| `src/agents/launcher.py` | `launch()` → тонкая обёртка над `_spawn()`; новый `launch_job(job)`; `_spawn()` (общий, `job_id` опционально); monitor/watchdog принимают `job_id`; новый `_finalize_job()` (статусы + ретраи). 4 внутренних advance-вызова `self.launch``enqueue_job` |
| `src/webhooks/plane.py` | 4 точки `launcher.launch``enqueue_job` |
| `src/webhooks/gitea.py` | 4 точки `launcher.launch``enqueue_job` |
| `src/queue_worker.py` | **НОВЫЙ**`QueueWorker` (drain loop + max_concurrency + graceful stop) |
| `src/main.py` | lifespan: queue-recovery (`requeue_running_jobs`) после M-1, старт/останов воркера; новый `GET /queue` |
| `tests/test_queue.py` | **НОВЫЙ** — 19 тестов (lifecycle, атомарность claim, ретраи, requeue, observability, worker max_concurrency; Popen полностью замокан) |
## Атомарность claim
```sql
SELECT id FROM jobs WHERE status='queued' ORDER BY id LIMIT 1;
UPDATE jobs SET status='running', attempts=attempts+1, started_at=datetime('now')
WHERE id=? AND status='queued'; -- rowcount==1 => claimed, ==0 => проиграл гонку
```
Гарантия: один job не выдаётся дважды даже при параллельных тиках воркера
(проверено `test_concurrent_claims_no_duplicate` — 8 потоков, 20 jobs).
## Сохранённые фиксы (НЕ сломаны)
- **B-1** task-file write (direct `open()` в worktree) — без изменений.
- **B-2** Popen → log_fh (no PIPE), monitor reap — без изменений, только обёрнут.
- **M-1** orphan-recovery в `main.py` — оставлен, queue-recovery добавлен ПОСЛЕ него.
- **ORCH-2** worktree per task — без изменений.
- **ORCH-6** project registry/filter — без изменений.
## Acceptance
| # | Проверка | Статус |
|---|----------|--------|
| 1 | webhook кладёт job (queued) | ✅ enqueue_job |
| 2 | воркер исполняет queued→running→done | ✅ worker + _finalize_job |
| 3 | running ≤ max_concurrency | ✅ test_worker_respects_max_concurrency |
| 4 | ретрай fail→queued→failed+notify | ✅ test_finalize_job_requeue_then_fail |
| 5 | рестарт-safe (running→requeue) | ✅ requeue_running_jobs + lifespan |
| 6 | M-1 не сломан | ✅ оставлен в lifespan |
| 7 | тесты (new green, 9 pre-existing) | ✅ 76 passed / 9 pre-existing |
| 8 | `/queue` | ✅ counts + recent |
## Тесты
```bash
IMG=$(docker inspect orchestrator --format '{{.Config.Image}}')
docker run --rm -v /home/slin/repos/orchestrator:/code -w /code \
--entrypoint python3 $IMG -m pytest tests/ -q
# 110 passed, 9 failed (pre-existing test_webhooks 401/signature/TypeError)
```
---
## Resilience-слой (ДОПОЛНЕНИЕ: preflight + 429 + backoff + circuit breaker)
Надёжность очереди против недоступности CLI и rate-limit. Два РАЗНЫХ класса
проблем лечатся по-разному.
### A. Дешёвый preflight (`src/preflight.py`) — не жжёт токены
Перед claim воркер проверяет: `os.path.exists(CLAUDE_BIN)` + `claude --version`
(timeout 5с, токены НЕ тратит). Результат кэшируется `preflight_cache_ttl` (45с).
FAIL → воркер НЕ claimит (job остаётся `queued`), ждёт. 🚫 НЕТ prompt-ping.
### B. 429 — детект НА ВЫХОДЕ (`src/error_classifier.py`)
rate-limit нельзя предсказать — классифицируем по логу прогона. `classify_log_file`
читает хвост лога (16KB), ищет `429/rate limit/overloaded/quota/503/529/timeout/...`
`transient` или `permanent`. Извлекает `Retry-After`.
- **transient** (429/сеть) → backoff-ретрай с ОТДЕЛЬНЫМ `transient_attempts`
(лимит `transient_max_attempts=5`) — не жжёт code-fault бюджет.
- **permanent** (code-fault) → обычные `attempts < max_attempts` (2), потом `failed`.
### C. Backoff + `available_at`
Колонки `jobs.available_at TEXT` + `jobs.transient_attempts INTEGER` (миграция
`_ensure_column`). `claim_next_job`: `WHERE status='queued' AND (available_at IS NULL
OR available_at <= datetime('now'))`. При transient: `available_at = now +
min(2^n * base, max)` (base=10с, max=600с), `Retry-After` уважается (берёмся max).
### D. Circuit breaker (`CircuitBreaker` в queue_worker)
N=3 transient подряд → **open**: воркер паузит `breaker_pause_seconds=300`, ВООБЩЕ
не дёргает CLI, Telegram-алерт. Через паузу → **half-open** (пробует 1 job);
ожил (exit 0) → **closed**; снова transient → опять open. Состояние в памяти
воркера, отражается в `/queue.resilience`.
Связь launcher→breaker — через callback `launcher.on_outcome` (без import-цикла).
### Конфиг (config.py)
`preflight_cache_ttl=45`, `backoff_base_seconds=10`, `backoff_max_seconds=600`,
`transient_max_attempts=5`, `breaker_threshold=3`, `breaker_pause_seconds=300`.
### Тесты
`tests/test_resilience.py` — 34 теста: preflight (FAIL→queued, кэш, force),
классификатор (transient/permanent/Retry-After), backoff (рост/cap/Retry-After,
`available_at` гейтинг), launcher transient/permanent finalize, breaker
(open/half-open/closed/re-open, блок claim).

View File

@@ -4,7 +4,7 @@ import logging
import threading
import signal
from ..config import settings
from ..db import get_db, get_task_by_repo_branch, update_task_stage
from ..db import get_db, get_task_by_repo_branch, update_task_stage, enqueue_job
from ..stages import get_next_stage, get_qg_for_stage, get_agent_for_stage
from ..git_worktree import ensure_worktree, get_worktree_path
from ..qg.checks import QG_CHECKS
@@ -57,7 +57,10 @@ class AgentLauncher:
def launch(self, agent: str, repo: str, task_content: str = None, task_id: int = None) -> int:
"""
Launch a Claude CLI agent.
Launch a Claude CLI agent directly (legacy synchronous path).
Kept for backward compatibility (direct callers / existing tests). The
ORCH-1 job queue uses launch_job() instead, but both share _spawn().
Args:
agent: Agent role (analyst, architect, developer, reviewer, tester)
@@ -68,6 +71,31 @@ class AgentLauncher:
Returns:
agent_run_id from DB
"""
return self._spawn(agent, repo, task_content, task_id, job_id=None)
def launch_job(self, job: dict) -> int:
"""ORCH-1: launch an agent for a claimed queue job.
Same spawn path as launch(), but threads job['id'] through so the monitor
can update the job's status (done / requeue / failed) and link jobs.run_id
to the agent_runs row. Returns the agent_run_id.
"""
return self._spawn(
job["agent"],
job["repo"],
job.get("task_content"),
job.get("task_id"),
job_id=job["id"],
)
def _spawn(self, agent: str, repo: str, task_content: str = None,
task_id: int = None, job_id: int = None) -> int:
"""Shared spawn implementation for launch() and launch_job().
When job_id is set, the monitor/watchdog drive the jobs table status
(ORCH-1). The claude-CLI Popen logic (B-2) and worktree/task-file logic
(B-1 / ORCH-2) are unchanged.
"""
config = self.AGENT_CONFIGS.get(agent)
if not config:
raise ValueError(f"Unknown agent: {agent}")
@@ -98,6 +126,14 @@ class AgentLauncher:
run_id = cursor.lastrowid
conn.commit()
# ORCH-1: link this job to the agent_runs row and stamp started_at.
if job_id is not None:
conn.execute(
"UPDATE jobs SET run_id = ?, started_at = datetime('now') WHERE id = ?",
(run_id, job_id),
)
conn.commit()
# Prepare output log path
output_path = f"/app/data/runs/{run_id}.log"
os.makedirs(os.path.dirname(output_path), exist_ok=True)
@@ -154,6 +190,7 @@ class AgentLauncher:
t = threading.Thread(
target=self._watchdog,
args=(proc.pid, run_id),
kwargs={"job_id": job_id},
daemon=True,
)
t.start()
@@ -163,6 +200,7 @@ class AgentLauncher:
m = threading.Thread(
target=self._monitor_agent,
args=(proc, run_id, agent, repo, agent_branch, output_path, log_fh),
kwargs={"job_id": job_id},
daemon=True,
)
m.start()
@@ -171,8 +209,13 @@ class AgentLauncher:
notify_agent_started(run_id, agent, task_id)
return run_id
def _watchdog(self, pid: int, run_id: int, timeout: int = None):
"""Kill agent if it exceeds timeout."""
def _watchdog(self, pid: int, run_id: int, timeout: int = None, job_id: int = None):
"""Kill agent if it exceeds timeout.
ORCH-1: on a timeout-kill the monitor's proc.wait() returns the kill exit
code and drives the job retry/fail logic, so the watchdog itself only needs
to SIGKILL and record the agent_runs exit. job_id is accepted for symmetry.
"""
import time
if timeout is None:
timeout = self.AGENT_TIMEOUT
@@ -190,7 +233,7 @@ class AgentLauncher:
except ProcessLookupError:
pass # Already finished
def _monitor_agent(self, proc, run_id, agent, repo, branch, output_path=None, log_fh=None):
def _monitor_agent(self, proc, run_id, agent, repo, branch, output_path=None, log_fh=None, job_id=None):
"""Wait for agent to finish, commit+push results, update DB.
B-2 fix: stdout already goes straight to the log file via Popen, so we just
@@ -318,6 +361,132 @@ class AgentLauncher:
if exit_code == 0:
self._try_advance_stage(run_id, agent, repo, branch)
# ORCH-1: drive the job-queue status for queue-launched jobs only.
# (Legacy direct launch() has job_id=None and is unaffected.)
if job_id is not None:
self._finalize_job(job_id, agent, run_id, exit_code, output_path=output_path)
def _backoff_seconds(self, transient_attempts: int, retry_after: int = None) -> int:
"""Exponential backoff for transient failures, honouring Retry-After.
backoff = min(2^transient_attempts * base, max). If the server sent a
Retry-After, use the larger of the two (never poll sooner than asked).
"""
base = settings.backoff_base_seconds
cap = settings.backoff_max_seconds
backoff = min((2 ** max(transient_attempts, 0)) * base, cap)
if retry_after is not None and retry_after > 0:
backoff = max(backoff, min(retry_after, cap))
return int(backoff)
def _finalize_job(self, job_id: int, agent: str, run_id: int, exit_code, output_path=None):
"""ORCH-1: update the jobs row after the agent process finished.
exit_code == 0 -> done (and resets the breaker streak via on_outcome).
exit_code != 0 -> classify the failure from the run log tail (token-free):
- TRANSIENT (429/overload/network): backoff-requeue with available_at in
the future + a SEPARATE transient_attempts budget
(settings.transient_max_attempts), honouring Retry-After. Reported to
the breaker so it opens after N consecutive transient failures.
- PERMANENT (code fault): ordinary attempts < max_attempts requeue,
otherwise 'failed' + Telegram.
"""
from ..db import get_job, mark_job
from ..error_classifier import classify_log_file
try:
job = get_job(job_id)
if not job:
return
if exit_code == 0:
mark_job(job_id, "done", run_id=run_id)
logger.info(f"Job {job_id} ({agent}) done (run_id={run_id})")
self._record_outcome(transient=False, recovered=True)
return
# Classify the failure from the agent log tail (no token cost).
kind, retry_after = "permanent", None
log_path = output_path or f"/app/data/runs/{run_id}.log"
try:
kind, retry_after = classify_log_file(log_path)
except Exception:
pass
if kind == "transient":
self._finalize_transient(job_id, agent, run_id, exit_code, job, retry_after)
else:
self._finalize_permanent(job_id, agent, run_id, exit_code, job)
except Exception as e:
logger.error(f"Job {job_id}: _finalize_job error: {e}")
def _finalize_transient(self, job_id, agent, run_id, exit_code, job, retry_after):
"""Transient (429/overload/net) failure -> backoff requeue or fail when budget out."""
from ..db import mark_job, mark_job_transient
tattempts = job.get("transient_attempts", 0)
tmax = settings.transient_max_attempts
err = (f"transient (429/overload) agent {agent} exit={exit_code} "
f"(run_id={run_id}); retry_after={retry_after}")
self._record_outcome(transient=True, recovered=False)
if tattempts < tmax:
backoff = self._backoff_seconds(tattempts + 1, retry_after)
mark_job_transient(job_id, backoff, error=err)
logger.warning(
f"Job {job_id} ({agent}) TRANSIENT fail (exit={exit_code}), "
f"backoff {backoff}s, transient_attempt {tattempts + 1}/{tmax}"
)
else:
mark_job(job_id, "failed", run_id=run_id, error=err)
logger.error(
f"Job {job_id} ({agent}) failed after {tattempts} transient attempts"
)
self._notify_failed(job_id, agent, job, run_id,
f"transient (rate-limit) after {tattempts} attempts")
def _finalize_permanent(self, job_id, agent, run_id, exit_code, job):
"""Permanent (code-fault) failure -> normal attempts<max requeue, then fail."""
from ..db import mark_job
attempts = job.get("attempts", 0)
max_attempts = job.get("max_attempts", 2)
err = f"agent {agent} exit_code={exit_code} (run_id={run_id})"
self._record_outcome(transient=False, recovered=False)
if attempts < max_attempts:
mark_job(job_id, "queued", run_id=run_id, error=err)
logger.warning(
f"Job {job_id} ({agent}) failed (exit={exit_code}), "
f"requeued (attempt {attempts}/{max_attempts})"
)
else:
mark_job(job_id, "failed", run_id=run_id, error=err)
logger.error(
f"Job {job_id} ({agent}) failed permanently after "
f"{attempts} attempts (exit={exit_code})"
)
self._notify_failed(job_id, agent, job, run_id,
f"{attempts} attempts (exit={exit_code})")
def _notify_failed(self, job_id, agent, job, run_id, why):
try:
from ..notifications import send_telegram
send_telegram(
f"\U0001f6a8 Job {job_id} ({agent}, repo {job.get('repo')}) "
f"failed: {why}. Logs: /app/data/runs/{run_id}.log"
)
except Exception:
pass
def _record_outcome(self, transient: bool, recovered: bool):
"""Forward the run outcome to the circuit breaker (if a worker is wired).
Decoupled via a settable callback (set by QueueWorker.start) so the launcher
does not hard-import the worker (avoids a cycle) and tests can run the
launcher standalone.
"""
cb = getattr(self, "on_outcome", None)
if cb:
try:
cb(transient=transient, recovered=recovered)
except Exception:
pass
def _try_advance_stage(self, run_id: int, agent: str, repo: str, branch: str):
"""After agent finishes successfully, check QG and advance stage if possible."""
try:
@@ -416,8 +585,8 @@ class AgentLauncher:
f"(attempt {retry_count+1}/3). Fix findings in "
f"docs/work-items/{work_item_id}/12-review.md"
)
new_run = self.launch("developer", repo, task_desc, task_id=task_id)
logger.info(f"Task {task_id}: reviewer REQUEST_CHANGES, relaunched developer (run_id={new_run})")
new_job = enqueue_job("developer", repo, task_desc, task_id=task_id)
logger.info(f"Task {task_id}: reviewer REQUEST_CHANGES, enqueued developer (job_id={new_job})")
else:
from ..notifications import send_telegram
send_telegram(f"\u26a0\ufe0f {work_item_id}: Max developer retries (3) reached. Manual intervention needed.")
@@ -446,8 +615,8 @@ class AgentLauncher:
f"Stage: development\nNote: Tests FAILED. "
f"Fix failures described in docs/work-items/{work_item_id}/13-test-report.md"
)
new_run = self.launch("developer", repo, task_desc, task_id=task_id)
logger.info(f"Task {task_id}: tester FAIL, relaunched developer (run_id={new_run})")
new_job = enqueue_job("developer", repo, task_desc, task_id=task_id)
logger.info(f"Task {task_id}: tester FAIL, enqueued developer (job_id={new_job})")
else:
from ..notifications import send_telegram
from ..plane_sync import set_issue_blocked
@@ -478,8 +647,8 @@ class AgentLauncher:
f"Stage: analysis\nNote: Architect conflict. Revise TRZ. "
f"See docs/work-items/{work_item_id}/10-conflict.md"
)
new_run = self.launch("analyst", repo, task_desc, task_id=task_id)
logger.info(f"Task {task_id}: architect conflict, relaunched analyst")
new_job = enqueue_job("analyst", repo, task_desc, task_id=task_id)
logger.info(f"Task {task_id}: architect conflict, enqueued analyst (job_id={new_job})")
return
return
@@ -496,8 +665,8 @@ class AgentLauncher:
next_agent = get_agent_for_stage(next_stage)
if next_agent:
task_desc = f"Work item: {work_item_id}\nRepo: {repo}\nBranch: {branch}\nStage: {next_stage}"
new_run_id = self.launch(next_agent, repo, task_desc, task_id=task_id)
logger.info(f"Task {task_id}: launched '{next_agent}' (run_id={new_run_id})")
new_job_id = enqueue_job(next_agent, repo, task_desc, task_id=task_id)
logger.info(f"Task {task_id}: enqueued '{next_agent}' (job_id={new_job_id})")
except Exception as e:
logger.error(f"Auto-advance failed for run_id={run_id}: {e}")

View File

@@ -30,6 +30,29 @@ class Settings(BaseSettings):
# DB
db_path: str = "/app/data/orchestrator.db"
# ORCH-1 (F-2b): persistent job queue / background worker.
# max_concurrency -> max agent jobs running in parallel (env ORCH_MAX_CONCURRENCY)
# queue_poll_interval -> worker loop poll seconds (env ORCH_QUEUE_POLL_INTERVAL)
max_concurrency: int = 1
queue_poll_interval: float = 2.0
# ORCH-1b (resilience): preflight + 429/rate-limit + backoff + circuit breaker.
# preflight_cache_ttl -> cache the cheap CLI/network preflight result (seconds);
# the worker does NOT re-run `claude --version` more often
# than this (env ORCH_PREFLIGHT_CACHE_TTL).
# backoff_base_seconds -> base for exponential transient backoff.
# backoff_max_seconds -> ceiling for the transient backoff.
# transient_max_attempts -> retry budget for transient (429/overload/network)
# failures, separate from code-fault `attempts`.
# breaker_threshold -> consecutive transient failures that OPEN the breaker.
# breaker_pause_seconds -> how long the breaker stays open before half-open.
preflight_cache_ttl: int = 45
backoff_base_seconds: int = 10
backoff_max_seconds: int = 600
transient_max_attempts: int = 5
breaker_threshold: int = 3
breaker_pause_seconds: int = 300
# Telegram notifications
telegram_bot_token: str = ""

266
src/db.py
View File

@@ -40,10 +40,44 @@ def init_db():
exit_code INTEGER,
output_path TEXT
);
-- ORCH-1 (F-2b): persistent job queue. Webhook handlers enqueue a job and
-- return immediately; a background worker claims jobs (respecting
-- max_concurrency), spawns the claude agent, and updates the status.
-- Restart-safe: running jobs are requeued on startup (queue-recovery).
CREATE TABLE IF NOT EXISTS jobs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
agent TEXT NOT NULL,
repo TEXT NOT NULL,
task_id INTEGER, -- FK tasks.id (nullable)
task_content TEXT, -- written to the agent task_file
status TEXT NOT NULL DEFAULT 'queued', -- queued|running|done|failed
attempts INTEGER NOT NULL DEFAULT 0,
max_attempts INTEGER NOT NULL DEFAULT 2,
run_id INTEGER, -- agent_runs.id once started
error TEXT, -- last error message
transient_attempts INTEGER NOT NULL DEFAULT 0, -- ORCH-1 resilience: 429/transient retries
available_at TEXT, -- ORCH-1 resilience: backoff gate (claim when <= now)
created_at TEXT DEFAULT (datetime('now')),
started_at TEXT,
finished_at TEXT
);
CREATE INDEX IF NOT EXISTS idx_jobs_status ON jobs(status, id);
""")
# Lightweight migration: add resilience columns to a pre-existing jobs table
# (CREATE TABLE IF NOT EXISTS won't add columns to an already-created table).
_ensure_column(conn, "jobs", "transient_attempts", "INTEGER NOT NULL DEFAULT 0")
_ensure_column(conn, "jobs", "available_at", "TEXT")
conn.close()
def _ensure_column(conn, table: str, column: str, decl: str):
"""Add a column to `table` if it does not already exist (idempotent migration)."""
cols = [r[1] for r in conn.execute(f"PRAGMA table_info({table})").fetchall()]
if column not in cols:
conn.execute(f"ALTER TABLE {table} ADD COLUMN {column} {decl}")
conn.commit()
def get_task_by_plane_id(plane_id: str) -> dict | None:
"""Find task by Plane work item ID (checks plane_id and plane_issue_id)."""
conn = get_db()
@@ -105,3 +139,235 @@ def get_next_work_item_id(repo: str, prefix: str = "ET") -> str:
next_num = 1
return f"{prefix}-{next_num:03d}"
# ---------------------------------------------------------------------------
# ORCH-1 (F-2b): job queue helpers
# ---------------------------------------------------------------------------
def enqueue_job(
agent: str,
repo: str,
task_content: str | None = None,
task_id: int | None = None,
max_attempts: int = 2,
) -> int:
"""Enqueue a new job (status='queued'). Returns the new job id.
This is what webhook handlers call instead of launching an agent in-process:
it is a fast DB INSERT that returns immediately. The background worker
(queue_worker) picks the job up later.
"""
conn = get_db()
cursor = conn.execute(
"INSERT INTO jobs (agent, repo, task_id, task_content, max_attempts) "
"VALUES (?, ?, ?, ?, ?)",
(agent, repo, task_id, task_content, max_attempts),
)
job_id = cursor.lastrowid
conn.commit()
conn.close()
return job_id
def claim_next_job() -> dict | None:
"""Atomically claim the oldest queued job and mark it 'running'.
Atomicity: the UPDATE carries the `status='queued'` guard in its WHERE clause
and we check `rowcount`. If two worker ticks race for the same row, only the
first UPDATE flips it to 'running' (rowcount==1); the loser sees rowcount==0
and retries the SELECT. We rely on SQLite's default per-connection transaction
so the SELECT+UPDATE pair is consistent. Returns the claimed job dict or None
when the queue is empty.
"""
conn = get_db()
try:
while True:
row = conn.execute(
"SELECT id FROM jobs WHERE status='queued' "
"AND (available_at IS NULL OR available_at <= datetime('now')) "
"ORDER BY id LIMIT 1"
).fetchone()
if not row:
return None
job_id = row["id"]
cur = conn.execute(
"UPDATE jobs SET status='running', "
"attempts = attempts + 1, started_at = datetime('now') "
"WHERE id = ? AND status='queued'",
(job_id,),
)
conn.commit()
if cur.rowcount == 1:
claimed = conn.execute(
"SELECT * FROM jobs WHERE id = ?", (job_id,)
).fetchone()
return dict(claimed)
# Lost the race for this row; loop and try the next queued job.
finally:
conn.close()
def mark_job_transient(job_id: int, available_at_sql_offset_seconds: int,
error: str | None = None) -> None:
"""ORCH-1 resilience: requeue a job after a *transient* failure (429/overload/net).
Increments `transient_attempts` (separate from the code-fault `attempts`),
sets status back to 'queued', and gates re-pickup via `available_at` =
now + backoff seconds. started_at/finished_at are cleared.
"""
conn = get_db()
sets = [
"status='queued'",
"transient_attempts = transient_attempts + 1",
"available_at = datetime('now', ?)",
"started_at = NULL",
"finished_at = NULL",
]
params: list = [f"+{int(available_at_sql_offset_seconds)} seconds"]
if error is not None:
sets.append("error = ?")
params.append(error)
params.append(job_id)
conn.execute(f"UPDATE jobs SET {', '.join(sets)} WHERE id = ?", params)
conn.commit()
conn.close()
def mark_job(
job_id: int,
status: str,
run_id: int | None = None,
error: str | None = None,
):
"""Update a job's status (queued|running|done|failed).
- run_id (optional): link to the agent_runs row that executed this job.
- error (optional): last error message (for failed/retry).
- 'done'/'failed' also stamp finished_at.
- 'queued' (requeue for retry) clears started_at/finished_at so the next
claim treats it as fresh.
"""
conn = get_db()
sets = ["status = ?"]
params: list = [status]
if run_id is not None:
sets.append("run_id = ?")
params.append(run_id)
if error is not None:
sets.append("error = ?")
params.append(error)
if status in ("done", "failed"):
sets.append("finished_at = datetime('now')")
elif status == "queued":
sets.append("started_at = NULL")
sets.append("finished_at = NULL")
params.append(job_id)
conn.execute(f"UPDATE jobs SET {', '.join(sets)} WHERE id = ?", params)
conn.commit()
conn.close()
def count_running_jobs() -> int:
"""Number of jobs currently in 'running' status (for max_concurrency)."""
conn = get_db()
n = conn.execute(
"SELECT COUNT(*) FROM jobs WHERE status='running'"
).fetchone()[0]
conn.close()
return int(n)
def requeue_running_jobs() -> int:
"""Queue-recovery: on startup, any job left 'running' belongs to a worker that
died on restart -> put it back to 'queued'. attempts are kept as-is (the next
claim does NOT re-increment beyond what is needed; claim_next_job increments on
pickup). Returns the number of requeued jobs.
"""
conn = get_db()
cur = conn.execute(
"UPDATE jobs SET status='queued', started_at = NULL "
"WHERE status='running'"
)
conn.commit()
n = cur.rowcount
conn.close()
return int(n)
def get_job(job_id: int) -> dict | None:
"""Fetch a single job by id."""
conn = get_db()
row = conn.execute("SELECT * FROM jobs WHERE id = ?", (job_id,)).fetchone()
conn.close()
return dict(row) if row else None
def job_status_counts() -> dict:
"""Return counts grouped by status (for /queue observability)."""
conn = get_db()
rows = conn.execute(
"SELECT status, COUNT(*) AS n FROM jobs GROUP BY status"
).fetchall()
conn.close()
counts = {"queued": 0, "running": 0, "done": 0, "failed": 0}
for r in rows:
counts[r["status"]] = r["n"]
return counts
def recent_jobs(limit: int = 10) -> list[dict]:
"""Return the most recent jobs (for /queue observability)."""
conn = get_db()
rows = conn.execute(
"SELECT * FROM jobs ORDER BY id DESC LIMIT ?", (limit,)
).fetchall()
conn.close()
return [dict(r) for r in rows]
# ---------------------------------------------------------------------------
# ORCH-1b (resilience): transient backoff helpers
# ---------------------------------------------------------------------------
def requeue_job_transient(job_id: int, delay_seconds: float, error: str | None = None):
"""ORCH-1b: requeue a job after a TRANSIENT (429/overload/network) failure.
Unlike a code-fault requeue, this:
- increments `transient_attempts` (a separate budget from code-fault attempts)
- sets `available_at = now + delay_seconds` so claim_next_job won't pick it
up until the backoff window elapses
- sets status back to 'queued' and clears started_at/finished_at
delay_seconds is computed by the caller (exp backoff, capped, Retry-After).
"""
conn = get_db()
conn.execute(
"UPDATE jobs SET status='queued', "
"transient_attempts = transient_attempts + 1, "
"available_at = datetime('now', ? || ' seconds'), "
"started_at = NULL, finished_at = NULL, "
"error = COALESCE(?, error) "
"WHERE id = ?",
(f"+{int(round(delay_seconds))}", error, job_id),
)
conn.commit()
conn.close()
def compute_backoff(transient_attempts: int, retry_after: float | None = None) -> float:
"""ORCH-1b: exponential backoff (seconds) for a transient failure.
delay = min(2**transient_attempts * base, max). If the server sent a
Retry-After hint we honour it as a floor (use the larger of the two so we
never poll sooner than the server asked).
`transient_attempts` is the count AFTER this failure (i.e. how many transient
failures have occurred), so the first backoff uses 2**1.
"""
base = getattr(settings, "backoff_base_seconds", 10)
cap = getattr(settings, "backoff_max_seconds", 600)
exp = min((2 ** max(transient_attempts, 0)) * base, cap)
if retry_after is not None and retry_after > 0:
return float(min(max(exp, retry_after), cap))
return float(exp)

87
src/error_classifier.py Normal file
View File

@@ -0,0 +1,87 @@
"""ORCH-1 resilience: classify an agent failure as transient vs permanent.
Rate limits / overload / network blips cannot be reliably predicted in advance,
so we classify *after the run* by scanning the agent's combined stdout/stderr log
(B-2 sends both to /app/data/runs/<run_id>.log).
- transient -> 429 / rate limit / overloaded / network / quota-exhausted etc.
=> backoff + transient retry (separate counter, larger budget).
- permanent -> a genuine code fault / agent error
=> normal attempts < max_attempts, then 'failed'.
Also extracts a Retry-After hint (seconds) when the server provided one.
"""
import re
# Case-insensitive substrings/patterns that signal a transient/rate-limit issue.
_TRANSIENT_PATTERNS = [
r"\b429\b",
r"rate[\s_-]*limit",
r"rate_limit_error",
r"overloaded",
r"overloaded_error",
r"too many requests",
r"quota",
r"insufficient[_\s-]*quota",
r"retry[\s-]*after",
r"service unavailable",
r"\b503\b",
r"\b529\b",
r"timed out",
r"timeout",
r"connection (reset|refused|error|aborted)",
r"temporarily unavailable",
r"econnreset",
r"etimedout",
]
_TRANSIENT_RE = re.compile("|".join(_TRANSIENT_PATTERNS), re.IGNORECASE)
# Retry-After: header style ("Retry-After: 30") or JSON ("retry_after": 30) or
# "retry after 30 seconds". Returns the integer seconds.
_RETRY_AFTER_RE = re.compile(
r"retry[\s_-]*after[\"']?\s*[:=]?\s*[\"']?\s*(\d+)",
re.IGNORECASE,
)
def classify_text(text: str) -> str:
"""Return 'transient' or 'permanent' for a chunk of log/stderr text."""
if not text:
return "permanent"
return "transient" if _TRANSIENT_RE.search(text) else "permanent"
def parse_retry_after(text: str) -> int | None:
"""Return Retry-After seconds if present in the text, else None."""
if not text:
return None
m = _RETRY_AFTER_RE.search(text)
if m:
try:
return int(m.group(1))
except (TypeError, ValueError):
return None
return None
def classify_log_file(path: str, tail_bytes: int = 16384) -> tuple[str, int | None]:
"""Classify the tail of a log file.
Reads the last `tail_bytes` of the log (rate-limit messages appear near the
end) and returns (classification, retry_after_seconds_or_None).
On any read error, treats it as 'permanent' (no special backoff).
"""
if not path:
return "permanent", None
try:
with open(path, "rb") as f:
try:
f.seek(-tail_bytes, 2)
except OSError:
f.seek(0)
data = f.read()
text = data.decode("utf-8", errors="replace")
except Exception:
return "permanent", None
return classify_text(text), parse_retry_after(text)

View File

@@ -51,7 +51,25 @@ async def lifespan(app: FastAPI):
except Exception:
pass
log.warning(f"Recovered {len(orphan_rows)} orphaned agent runs")
yield
# ORCH-1 (F-2b): queue-recovery. Any job left in 'running' status belongs to a
# worker that died on the previous restart -> put it back to 'queued' so the
# worker re-picks it up (restart-safe, no lost work). Runs AFTER M-1.
from .db import requeue_running_jobs
requeued = requeue_running_jobs()
if requeued:
log.warning(f"Queue-recovery: requeued {requeued} running job(s) after restart")
# Start the background job-queue worker (ORCH-1).
from .queue_worker import worker
worker.start()
try:
yield
finally:
# Graceful shutdown of the worker (running agents keep going; their jobs
# are requeued on next start via queue-recovery if the process dies).
worker.stop()
app = FastAPI(title="Multi-Agent Orchestrator", lifespan=lifespan)
@@ -73,3 +91,17 @@ async def status():
).fetchall()
conn.close()
return {"active_tasks": [dict(t) for t in tasks]}
@app.get("/queue")
async def queue():
"""ORCH-1: job-queue observability — status counts + recent jobs."""
from .db import job_status_counts, recent_jobs
from .queue_worker import worker
return {
"counts": job_status_counts(),
"max_concurrency": worker.max_concurrency,
"poll_interval": worker.poll_interval,
"resilience": worker.status(),
"recent": recent_jobs(10),
}

106
src/preflight.py Normal file
View File

@@ -0,0 +1,106 @@
"""ORCH-1 resilience: cheap preflight check (CLI / network available?).
Goal: before the worker claims a job, confirm the claude CLI binary and runtime
are reachable WITHOUT spending any tokens. We only do local/cheap checks:
1. os.path.exists(CLAUDE_BIN) -- instant
2. `claude --version` (timeout 5s) -- spawns CLI, does NOT call the API
The result is cached for `preflight_cache_ttl` seconds so we do not re-run
`claude --version` on every worker tick.
🚫 We deliberately do NOT do a prompt ping (ping->pong) — that would burn the
rate limit and add latency. Preflight is local-only.
"""
import os
import time
import logging
import subprocess
from .config import settings
logger = logging.getLogger("orchestrator.preflight")
_VERSION_TIMEOUT = 5
class _PreflightCache:
def __init__(self):
self.ts: float = 0.0
self.ok: bool = False
self.reason: str = "not checked yet"
_cache = _PreflightCache()
def _claude_bin() -> str:
"""Resolve the claude binary preflight should check.
Must match the binary the launcher actually spawns. The launcher hardcodes
AgentLauncher.CLAUDE_BIN for the real Popen, so we prefer that; we only fall
back to settings.claude_bin / a default if it is somehow unset. (Note: the
container's ORCH_CLAUDE_BIN may point elsewhere; preflight follows the path
that is genuinely executed, not the unused env override.)
"""
try:
from .agents.launcher import AgentLauncher
launcher_bin = getattr(AgentLauncher, "CLAUDE_BIN", None)
if launcher_bin and os.path.exists(launcher_bin):
return launcher_bin
# Launcher path not present -> fall back to configured/default.
return launcher_bin or getattr(settings, "claude_bin", None) or "/opt/claude-code/bin/claude.exe"
except Exception:
return getattr(settings, "claude_bin", None) or "/opt/claude-code/bin/claude.exe"
def _run_version(bin_path: str) -> tuple[bool, str]:
"""`claude --version` — proves the CLI runs without touching the API."""
try:
r = subprocess.run(
[bin_path, "--version"],
capture_output=True,
text=True,
timeout=_VERSION_TIMEOUT,
)
if r.returncode == 0:
return True, (r.stdout or r.stderr or "").strip()[:120] or "ok"
return False, f"--version exit {r.returncode}: {(r.stderr or r.stdout).strip()[:120]}"
except subprocess.TimeoutExpired:
return False, f"--version timed out after {_VERSION_TIMEOUT}s"
except FileNotFoundError:
return False, "claude binary not found (FileNotFoundError)"
except Exception as e: # pragma: no cover - defensive
return False, f"--version error: {e}"
def _compute() -> tuple[bool, str]:
bin_path = _claude_bin()
if not os.path.exists(bin_path):
return False, f"CLAUDE_BIN not found: {bin_path}"
return _run_version(bin_path)
def check(force: bool = False) -> tuple[bool, str]:
"""Return (ok, reason). Cached for preflight_cache_ttl seconds.
force=True bypasses the cache (used by the breaker half-open probe / tests).
"""
now = time.time()
ttl = settings.preflight_cache_ttl
if not force and _cache.ts > 0 and (now - _cache.ts) < ttl:
return _cache.ok, _cache.reason
ok, reason = _compute()
_cache.ts = now
_cache.ok = ok
_cache.reason = reason
if not ok:
logger.warning(f"Preflight FAIL: {reason}")
return ok, reason
def reset_cache() -> None:
"""Invalidate the cache (tests / forced recheck)."""
_cache.ts = 0.0
_cache.ok = False
_cache.reason = "reset"

246
src/queue_worker.py Normal file
View File

@@ -0,0 +1,246 @@
"""ORCH-1 (F-2b): background job-queue worker with resilience layer.
A single background thread polls the `jobs` table and spawns agents:
while running:
if breaker.open and not cooled_down: sleep; continue # don't touch CLI
if not preflight.ok: sleep; continue # CLI/net down -> wait
while count_running_jobs() < max_concurrency:
job = claim_next_job() # atomic queued -> running (available_at-gated)
if not job: break
launcher.launch_job(job) # spawns claude (Popen) + monitor thread
sleep(poll_interval)
Resilience (ДОПОЛНЕНИЕ):
A. Preflight — cheap local CLI/net check (cached, no tokens) gates claiming.
B/C. The launcher classifies failures (transient vs permanent) and applies
backoff via available_at; the worker only needs to honour available_at
(claim_next_job does) and react to transient outcomes via the breaker.
D. Circuit breaker — N consecutive transient failures -> open (pause M minutes,
no CLI calls, Telegram alert) -> half-open (probe one job) -> closed.
Design: plain daemon thread + threading.Event (the launcher already manages its
own monitor/watchdog threads + blocking Popen).
"""
import time
import logging
import threading
from .config import settings
from .db import claim_next_job, count_running_jobs
from .agents.launcher import launcher
from . import preflight
logger = logging.getLogger("orchestrator.queue_worker")
class CircuitBreaker:
"""Trips after `threshold` consecutive transient failures.
States: closed -> (threshold transient) -> open -> (after pause) half-open
-> (recovered) closed | (transient again) open.
Thread-safe enough for our single-worker + monitor-thread callbacks (a lock
guards the counters).
"""
def __init__(self, threshold: int = None, pause_seconds: int = None):
self.threshold = threshold if threshold is not None else settings.breaker_threshold
self.pause_seconds = (
pause_seconds if pause_seconds is not None else settings.breaker_pause_seconds
)
self._lock = threading.Lock()
self.state = "closed" # closed | open | half-open
self.consecutive_transient = 0
self.opened_at = 0.0
self._notify = None # optional callable(message) for alerts
def set_notifier(self, fn):
self._notify = fn
def record_transient(self):
with self._lock:
self.consecutive_transient += 1
if self.state == "half-open":
# Probe failed -> re-open.
self._open("circuit re-opened: probe job hit transient again")
elif self.consecutive_transient >= self.threshold and self.state == "closed":
self._open(
f"circuit OPEN: {self.consecutive_transient} consecutive "
f"transient failures; pausing {self.pause_seconds}s (no CLI calls)"
)
def record_recovered(self):
with self._lock:
self.consecutive_transient = 0
if self.state in ("half-open", "open"):
self.state = "closed"
logger.info("Circuit CLOSED: recovered")
def record_permanent(self):
# A clean permanent (code-fault) failure breaks the transient streak.
with self._lock:
self.consecutive_transient = 0
def _open(self, msg: str):
self.state = "open"
self.opened_at = time.time()
logger.warning(msg)
if self._notify:
try:
self._notify(f"\U0001f534 {msg}")
except Exception:
pass
def allow_claim(self) -> bool:
"""Return True if the worker may attempt to claim/launch a job now.
- closed -> yes.
- open -> no until pause elapsed; then transition to half-open (yes, one probe).
- half-open -> yes (the single probe).
"""
with self._lock:
if self.state == "closed":
return True
if self.state == "open":
if (time.time() - self.opened_at) >= self.pause_seconds:
self.state = "half-open"
logger.info("Circuit HALF-OPEN: probing one job")
return True
return False
# half-open: allow the probe.
return True
def snapshot(self) -> dict:
with self._lock:
remaining = 0
if self.state == "open":
remaining = max(0, int(self.pause_seconds - (time.time() - self.opened_at)))
return {
"state": self.state,
"consecutive_transient": self.consecutive_transient,
"pause_remaining_s": remaining,
}
class QueueWorker:
"""Background worker that drains the persistent job queue (with resilience)."""
def __init__(self, max_concurrency: int = None, poll_interval: float = None,
breaker: CircuitBreaker = None):
self.max_concurrency = (
max_concurrency if max_concurrency is not None else settings.max_concurrency
)
self.poll_interval = (
poll_interval if poll_interval is not None else settings.queue_poll_interval
)
self.breaker = breaker or CircuitBreaker()
self.last_preflight_ok = True
self.last_preflight_reason = "not checked"
self._stop = threading.Event()
self._thread: threading.Thread | None = None
# --- circuit breaker outcome callback wired into the launcher ----------
def _on_outcome(self, transient: bool, recovered: bool):
if recovered:
self.breaker.record_recovered()
elif transient:
self.breaker.record_transient()
else:
self.breaker.record_permanent()
def _drain_once(self):
"""Claim and launch jobs until concurrency is full or the queue is empty.
Gated by the circuit breaker and preflight: if the breaker is open (and
not yet cooled down) or preflight fails, we do NOT claim — jobs stay
queued and no CLI/tokens are touched.
"""
if not self.breaker.allow_claim():
return
ok, reason = preflight.check()
self.last_preflight_ok = ok
self.last_preflight_reason = reason
if not ok:
logger.info(f"Preflight not ok ({reason}) -> not claiming jobs this tick")
return
# In half-open we only probe a single job, regardless of max_concurrency.
half_open = self.breaker.snapshot()["state"] == "half-open"
launched = 0
while not self._stop.is_set():
if half_open and launched >= 1:
return
if count_running_jobs() >= self.max_concurrency:
return
job = claim_next_job()
if not job:
return
launched += 1
try:
run_id = launcher.launch_job(job)
logger.info(
f"Worker launched job {job['id']} ({job['agent']}, "
f"repo {job['repo']}) -> run_id={run_id}"
)
except Exception as e:
# Launch itself failed (e.g. repo missing): treat as a permanent
# launch error so the job does not wedge as 'running' forever.
logger.error(f"Worker failed to launch job {job['id']}: {e}")
try:
from .db import get_job, mark_job
j = get_job(job["id"])
attempts = j.get("attempts", 0) if j else 0
max_attempts = j.get("max_attempts", 2) if j else 2
if attempts < max_attempts:
mark_job(job["id"], "queued", error=f"launch error: {e}")
else:
mark_job(job["id"], "failed", error=f"launch error: {e}")
except Exception:
pass
def _run(self):
logger.info(
f"Queue worker started (max_concurrency={self.max_concurrency}, "
f"poll_interval={self.poll_interval}s, breaker_threshold={self.breaker.threshold})"
)
while not self._stop.is_set():
try:
self._drain_once()
except Exception as e:
logger.error(f"Queue worker loop error: {e}")
self._stop.wait(self.poll_interval)
logger.info("Queue worker stopped")
def start(self):
if self._thread and self._thread.is_alive():
return
# Wire breaker alerting + launcher outcome callback.
try:
from .notifications import send_telegram
self.breaker.set_notifier(send_telegram)
except Exception:
pass
launcher.on_outcome = self._on_outcome
self._stop.clear()
self._thread = threading.Thread(
target=self._run, name="queue-worker", daemon=True
)
self._thread.start()
def stop(self, timeout: float = 5.0):
self._stop.set()
if self._thread:
self._thread.join(timeout=timeout)
def status(self) -> dict:
"""Resilience snapshot for /queue."""
return {
"breaker": self.breaker.snapshot(),
"preflight_ok": self.last_preflight_ok,
"preflight_reason": self.last_preflight_reason,
}
# Module-level singleton used by the FastAPI lifespan.
worker = QueueWorker()

View File

@@ -10,7 +10,7 @@ import httpx
from fastapi import APIRouter, Request, HTTPException
from ..config import settings
from ..db import get_db, get_task_by_repo_branch, update_task_stage
from ..db import get_db, get_task_by_repo_branch, update_task_stage, enqueue_job
from ..stages import get_next_stage, get_agent_for_stage
from ..qg.checks import check_ci_green, check_review_approved
from ..notifications import notify_stage_change, notify_qg_failure, notify_error
@@ -123,8 +123,8 @@ async def handle_push(payload: dict):
if agent:
try:
task_desc = f"Work item: {work_item_id}\nRepo: {repo_name}\nBranch: {branch}\nStage: {next_stage}"
run_id = launcher.launch(agent, repo_name, task_desc, task_id=task_id)
logger.info(f"Task {task_id}: push triggered {current_stage}{next_stage}, launched '{agent}' (run_id={run_id})")
job_id = enqueue_job(agent, repo_name, task_desc, task_id=task_id)
logger.info(f"Task {task_id}: push triggered {current_stage}{next_stage}, enqueued '{agent}' (job_id={job_id})")
except Exception as e:
notify_error(task_id, f"Failed to launch agent '{agent}': {e}")
@@ -200,8 +200,8 @@ async def handle_ci_status(payload: dict):
if agent:
try:
task_desc = f"Work item: {work_item_id}\nRepo: {repo_name}\nBranch: {branch}\nStage: {next_stage}"
run_id = launcher.launch(agent, repo_name, task_desc, task_id=task_id)
logger.info(f"Task {task_id}: CI green → {next_stage}, launched '{agent}' (run_id={run_id})")
job_id = enqueue_job(agent, repo_name, task_desc, task_id=task_id)
logger.info(f"Task {task_id}: CI green → {next_stage}, enqueued '{agent}' (job_id={job_id})")
except Exception as e:
notify_error(task_id, f"Failed to launch agent '{agent}': {e}")
else:
@@ -272,8 +272,8 @@ async def handle_pr(payload: dict):
if agent:
try:
task_desc = f"Work item: {work_item_id}\nRepo: {repo_name}\nBranch: {head_branch}\nStage: {next_stage}"
run_id = launcher.launch(agent, repo_name, task_desc, task_id=task_id)
logger.info(f"Task {task_id}: PR approved → {next_stage}, launched '{agent}' (run_id={run_id})")
job_id = enqueue_job(agent, repo_name, task_desc, task_id=task_id)
logger.info(f"Task {task_id}: PR approved → {next_stage}, enqueued '{agent}' (job_id={job_id})")
except Exception as e:
notify_error(task_id, f"Failed to launch agent '{agent}': {e}")
else:
@@ -297,8 +297,8 @@ async def handle_pr(payload: dict):
f"Work item: {work_item_id}\nRepo: {repo_name}\nBranch: {head_branch}\n"
f"Stage: development\nNote: Changes requested in review (attempt {retry_count + 1}/{MAX_DEV_RETRIES})"
)
run_id = launcher.launch("developer", repo_name, task_desc, task_id=task_id)
logger.info(f"Task {task_id}: changes requested, relaunching developer (attempt {retry_count + 1})")
job_id = enqueue_job("developer", repo_name, task_desc, task_id=task_id)
logger.info(f"Task {task_id}: changes requested, enqueued developer (attempt {retry_count + 1}, job_id={job_id})")
except Exception as e:
notify_error(task_id, f"Failed to relaunch developer: {e}")
else:

View File

@@ -14,6 +14,7 @@ from ..db import (
get_task_by_plane_id,
get_next_work_item_id,
update_task_stage,
enqueue_job,
)
from ..stages import get_next_stage, get_agent_for_stage, get_qg_for_stage, get_previous_stage
from ..qg.checks import QG_CHECKS
@@ -186,8 +187,8 @@ async def handle_work_item_created(data: dict, project_id: str = ""):
if task_row:
task_id = task_row[0]
task_desc = f"Work item: {work_item_id}\nRepo: {repo}\nBranch: {branch}\nStage: analysis\nTitle: {name}"
run_id = launcher.launch("analyst", repo, task_desc, task_id=task_id)
logger.info(f"Task {task_id}: launched analyst (run_id={run_id})")
job_id = enqueue_job("analyst", repo, task_desc, task_id=task_id)
logger.info(f"Task {task_id}: enqueued analyst (job_id={job_id})")
# Post start comment to Plane
from ..plane_sync import add_comment as _add_comment
_add_comment(work_item_id, "\U0001f50d Analyst \u0437\u0430\u043f\u0443\u0449\u0435\u043d. BRD/\u0422\u0417/AC/TestPlan \u0432 \u0440\u0430\u0431\u043e\u0442\u0435 (\u043e\u0436\u0438\u0434\u0430\u0439\u0442\u0435 8-15 \u043c\u0438\u043d).")
@@ -231,10 +232,10 @@ async def handle_comment(data: dict, project_id: str = ""):
f"Stage: analysis\nNote: Stakeholder REJECTED your artifacts. "
f"Reason: {reason}\nRevise and improve."
)
new_run = launcher.launch("analyst", repo, task_desc, task_id=task_id)
new_job = enqueue_job("analyst", repo, task_desc, task_id=task_id)
from ..plane_sync import add_comment as _plane_comment
_plane_comment(work_item_id, f"\U0001f504 Analyst \u043f\u0435\u0440\u0435\u0437\u0430\u043f\u0443\u0449\u0435\u043d. \u041f\u0440\u0438\u0447\u0438\u043d\u0430 \u043e\u0442\u043a\u043b\u043e\u043d\u0435\u043d\u0438\u044f: {reason}")
logger.info(f"Task {task_id}: rejected at analysis, relaunched analyst")
logger.info(f"Task {task_id}: rejected at analysis, enqueued analyst (job_id={new_job})")
else:
# Rollback to previous stage
prev_stage = get_previous_stage(current_stage)
@@ -305,10 +306,10 @@ async def handle_comment(data: dict, project_id: str = ""):
f"Read the latest comment in Plane and revise your artifacts.\n"
f"Answer: {comment_body[:500]}"
)
new_run = launcher.launch("analyst", repo, task_desc, task_id=task_id)
new_job = enqueue_job("analyst", repo, task_desc, task_id=task_id)
from ..plane_sync import add_comment as _pc2
_pc2(work_item_id, "\U0001f504 Analyst \u043f\u0435\u0440\u0435\u0437\u0430\u043f\u0443\u0449\u0435\u043d \u0441 \u043e\u0442\u0432\u0435\u0442\u0430\u043c\u0438 \u0441\u0442\u0435\u0439\u043a\u0445\u043e\u043b\u0434\u0435\u0440\u0430.")
logger.info(f"Task {task_id}: stakeholder answered questions, relaunched analyst (run_id={new_run})")
logger.info(f"Task {task_id}: stakeholder answered questions, enqueued analyst (job_id={new_job})")
return
except Exception as e:
logger.error(f"Failed to check issue state: {e}")
@@ -386,9 +387,9 @@ async def _try_advance_stage(
if agent:
try:
task_desc = f"Work item: {work_item_id}\nRepo: {repo}\nBranch: {branch}\nStage: {next_stage}"
run_id = launcher.launch(agent, repo, task_desc, task_id=task_id)
job_id = enqueue_job(agent, repo, task_desc, task_id=task_id)
plane_notify_stage(work_item_id, current_stage, next_stage, agent)
logger.info(f"Task {task_id}: launched agent '{agent}', run_id={run_id}")
logger.info(f"Task {task_id}: enqueued agent '{agent}', job_id={job_id}")
except Exception as e:
notify_error(task_id, f"Failed to launch agent '{agent}': {e}")
logger.error(f"Agent launch failed: {e}")

304
tests/test_queue.py Normal file
View File

@@ -0,0 +1,304 @@
"""Tests for ORCH-1 (F-2b) persistent job queue.
Covers:
- enqueue_job -> claim_next_job -> mark_job lifecycle
- claim_next_job atomicity (no double-dispatch of the same job)
- retry: fail -> requeue while attempts < max_attempts, then failed
- requeue_running_jobs (queue-recovery)
- count_running_jobs / job_status_counts / recent_jobs
- QueueWorker respects max_concurrency (Popen / launch fully mocked)
The real claude/Popen is NEVER spawned: launcher.launch_job is mocked in worker
tests, and the launcher finalize logic is exercised directly via mark_job.
"""
import os
import tempfile
import pytest
# Override env before importing app modules (same convention as test_qg.py).
_test_db = os.path.join(tempfile.gettempdir(), "test_orchestrator_queue.db")
os.environ["ORCH_DB_PATH"] = _test_db
os.environ["ORCH_REPOS_DIR"] = tempfile.gettempdir()
os.environ["ORCH_GITEA_TOKEN"] = "test-token"
os.environ["ORCH_PLANE_API_TOKEN"] = "test-token"
import src.db as db
from src.db import (
init_db,
enqueue_job,
claim_next_job,
mark_job,
count_running_jobs,
requeue_running_jobs,
get_job,
job_status_counts,
recent_jobs,
)
@pytest.fixture(autouse=True)
def fresh_db(tmp_path, monkeypatch):
"""Point the DB at a fresh per-test sqlite file and init the schema."""
dbfile = tmp_path / "queue.db"
monkeypatch.setattr(db.settings, "db_path", str(dbfile))
init_db()
yield
# ---------------------------------------------------------------------------
# enqueue / claim / mark lifecycle
# ---------------------------------------------------------------------------
class TestLifecycle:
def test_enqueue_creates_queued_job(self):
jid = enqueue_job("analyst", "enduro-trails", "task body", task_id=7)
job = get_job(jid)
assert job["status"] == "queued"
assert job["agent"] == "analyst"
assert job["repo"] == "enduro-trails"
assert job["task_content"] == "task body"
assert job["task_id"] == 7
assert job["attempts"] == 0
assert job["max_attempts"] == 2
def test_claim_marks_running_and_increments_attempts(self):
jid = enqueue_job("developer", "repo")
claimed = claim_next_job()
assert claimed is not None
assert claimed["id"] == jid
assert claimed["status"] == "running"
assert claimed["attempts"] == 1
assert count_running_jobs() == 1
def test_claim_empty_queue_returns_none(self):
assert claim_next_job() is None
def test_claim_is_fifo(self):
a = enqueue_job("analyst", "r")
b = enqueue_job("developer", "r")
assert claim_next_job()["id"] == a
assert claim_next_job()["id"] == b
def test_mark_done(self):
jid = enqueue_job("tester", "r")
claim_next_job()
mark_job(jid, "done", run_id=42)
job = get_job(jid)
assert job["status"] == "done"
assert job["run_id"] == 42
assert job["finished_at"] is not None
assert count_running_jobs() == 0
def test_mark_failed_records_error(self):
jid = enqueue_job("tester", "r")
claim_next_job()
mark_job(jid, "failed", run_id=9, error="boom")
job = get_job(jid)
assert job["status"] == "failed"
assert job["error"] == "boom"
assert job["finished_at"] is not None
# ---------------------------------------------------------------------------
# claim atomicity — no double dispatch
# ---------------------------------------------------------------------------
class TestClaimAtomicity:
def test_single_job_claimed_once(self):
jid = enqueue_job("analyst", "r")
first = claim_next_job()
second = claim_next_job()
assert first["id"] == jid
assert second is None # already running, not re-dispatched
def test_concurrent_claims_no_duplicate(self):
"""Many enqueued jobs claimed from parallel threads -> each claimed once."""
import threading
n = 20
for _ in range(n):
enqueue_job("developer", "r")
claimed_ids = []
lock = threading.Lock()
def grab():
while True:
job = claim_next_job()
if job is None:
return
with lock:
claimed_ids.append(job["id"])
threads = [threading.Thread(target=grab) for _ in range(8)]
for t in threads:
t.start()
for t in threads:
t.join()
assert len(claimed_ids) == n
assert len(set(claimed_ids)) == n # no id claimed twice
assert count_running_jobs() == n
# ---------------------------------------------------------------------------
# retry semantics (mirrors launcher._finalize_job logic)
# ---------------------------------------------------------------------------
class TestRetry:
def test_fail_requeues_while_under_max(self):
jid = enqueue_job("developer", "r", max_attempts=2)
job = claim_next_job() # attempts=1
assert job["attempts"] == 1
# attempts(1) < max(2) -> requeue
mark_job(jid, "queued", error="exit 1")
j = get_job(jid)
assert j["status"] == "queued"
assert j["error"] == "exit 1"
assert j["started_at"] is None # requeue clears started_at
def test_fail_fails_when_max_reached(self):
jid = enqueue_job("developer", "r", max_attempts=2)
claim_next_job() # attempts=1 -> requeue
mark_job(jid, "queued")
job2 = claim_next_job() # attempts=2
assert job2["attempts"] == 2
# attempts(2) >= max(2) -> failed
mark_job(jid, "failed", error="exit 1")
assert get_job(jid)["status"] == "failed"
def test_finalize_job_done(self):
"""launcher._finalize_job marks done on exit_code 0 (no Popen needed)."""
from src.agents.launcher import AgentLauncher
jid = enqueue_job("analyst", "r")
claim_next_job()
AgentLauncher()._finalize_job(jid, "analyst", run_id=5, exit_code=0)
assert get_job(jid)["status"] == "done"
def test_finalize_job_requeue_then_fail(self, monkeypatch):
from src.agents.launcher import AgentLauncher
# Silence telegram side-effect.
monkeypatch.setattr("src.notifications.send_telegram", lambda *a, **k: None)
lr = AgentLauncher()
jid = enqueue_job("developer", "r", max_attempts=2)
claim_next_job() # attempts=1
lr._finalize_job(jid, "developer", run_id=1, exit_code=2)
assert get_job(jid)["status"] == "queued" # 1 < 2 -> requeue
claim_next_job() # attempts=2
lr._finalize_job(jid, "developer", run_id=2, exit_code=2)
assert get_job(jid)["status"] == "failed" # 2 >= 2 -> failed
# ---------------------------------------------------------------------------
# queue-recovery
# ---------------------------------------------------------------------------
class TestRequeueRunning:
def test_requeue_running_jobs(self):
a = enqueue_job("analyst", "r")
b = enqueue_job("developer", "r")
claim_next_job() # a -> running
claim_next_job() # b -> running
assert count_running_jobs() == 2
n = requeue_running_jobs()
assert n == 2
assert count_running_jobs() == 0
assert get_job(a)["status"] == "queued"
assert get_job(b)["status"] == "queued"
def test_requeue_preserves_attempts(self):
jid = enqueue_job("analyst", "r")
claim_next_job() # attempts=1
requeue_running_jobs()
assert get_job(jid)["attempts"] == 1 # not reset
# ---------------------------------------------------------------------------
# observability helpers
# ---------------------------------------------------------------------------
class TestObservability:
def test_status_counts(self):
enqueue_job("analyst", "r") # stays queued
enqueue_job("developer", "r") # first claimed -> running (FIFO)
claim_next_job()
counts = job_status_counts()
assert counts["running"] == 1
assert counts["queued"] == 1
assert counts["done"] == 0
assert counts["failed"] == 0
def test_recent_jobs_desc(self):
ids = [enqueue_job("analyst", "r") for _ in range(3)]
recent = recent_jobs(10)
assert [r["id"] for r in recent] == sorted(ids, reverse=True)
# ---------------------------------------------------------------------------
# QueueWorker max_concurrency (launch_job fully mocked — no real Popen)
# ---------------------------------------------------------------------------
class TestWorkerConcurrency:
@pytest.fixture(autouse=True)
def _ok_preflight(self, monkeypatch):
# ORCH-1 resilience: the worker gates claims behind preflight; in tests there
# is no claude binary, so stub preflight OK to exercise pure queue/concurrency.
monkeypatch.setattr("src.queue_worker.preflight.check", lambda *a, **k: (True, "ok"))
def test_worker_respects_max_concurrency(self, monkeypatch):
from src.queue_worker import QueueWorker
launched = []
def fake_launch_job(job):
# Simulate a long-running agent: the job stays 'running' (we do NOT
# mark it done), so the slot remains occupied.
launched.append(job["id"])
return 100 + job["id"]
monkeypatch.setattr("src.queue_worker.launcher.launch_job", fake_launch_job)
for _ in range(5):
enqueue_job("developer", "r")
w = QueueWorker(max_concurrency=2, poll_interval=0.01)
w._drain_once()
# Only max_concurrency jobs may be launched / running at once.
assert len(launched) == 2
assert count_running_jobs() == 2
def test_worker_drains_as_slots_free(self, monkeypatch):
from src.queue_worker import QueueWorker
def fake_launch_job(job):
# Immediately complete the job so the slot frees for the next claim.
mark_job(job["id"], "done", run_id=job["id"])
return job["id"]
monkeypatch.setattr("src.queue_worker.launcher.launch_job", fake_launch_job)
for _ in range(4):
enqueue_job("analyst", "r")
w = QueueWorker(max_concurrency=1, poll_interval=0.01)
w._drain_once()
# With instant completion and concurrency 1, one drain pass empties the queue.
assert job_status_counts()["done"] == 4
assert count_running_jobs() == 0
def test_worker_launch_failure_does_not_wedge_slot(self, monkeypatch):
from src.queue_worker import QueueWorker
def boom(job):
raise RuntimeError("repo missing")
monkeypatch.setattr("src.queue_worker.launcher.launch_job", boom)
monkeypatch.setattr("src.notifications.send_telegram", lambda *a, **k: None)
enqueue_job("developer", "r", max_attempts=1)
w = QueueWorker(max_concurrency=1, poll_interval=0.01)
w._drain_once()
# attempts=1 >= max_attempts=1 -> failed, not stuck running.
assert count_running_jobs() == 0
counts = job_status_counts()
assert counts["failed"] == 1

295
tests/test_resilience.py Normal file
View File

@@ -0,0 +1,295 @@
"""ORCH-1 resilience tests: preflight, 429-classifier, backoff, circuit breaker.
No real claude/Popen is ever spawned: preflight subprocess and launcher.launch_job
are mocked. DB is a fresh per-test sqlite file.
"""
import os
import tempfile
import pytest
_test_db = os.path.join(tempfile.gettempdir(), "test_orchestrator_resilience.db")
os.environ["ORCH_DB_PATH"] = _test_db
os.environ["ORCH_REPOS_DIR"] = tempfile.gettempdir()
os.environ["ORCH_GITEA_TOKEN"] = "test-token"
os.environ["ORCH_PLANE_API_TOKEN"] = "test-token"
import src.db as db
from src.db import (
init_db, enqueue_job, claim_next_job, get_job, count_running_jobs,
mark_job_transient,
)
from src import preflight, error_classifier
from src.error_classifier import classify_text, parse_retry_after, classify_log_file
from src.queue_worker import QueueWorker, CircuitBreaker
from src.agents.launcher import AgentLauncher
@pytest.fixture(autouse=True)
def fresh_db(tmp_path, monkeypatch):
monkeypatch.setattr(db.settings, "db_path", str(tmp_path / "res.db"))
init_db()
preflight.reset_cache()
yield
# ---------------------------------------------------------------------------
# A. Preflight
# ---------------------------------------------------------------------------
class TestPreflight:
def test_fail_when_bin_missing(self, monkeypatch):
monkeypatch.setattr(preflight, "_claude_bin", lambda: "/no/such/claude")
ok, reason = preflight.check(force=True)
assert ok is False
assert "not found" in reason.lower()
def test_ok_when_version_succeeds(self, monkeypatch, tmp_path):
fake_bin = tmp_path / "claude"
fake_bin.write_text("#!/bin/sh\necho v1\n")
monkeypatch.setattr(preflight, "_claude_bin", lambda: str(fake_bin))
monkeypatch.setattr(preflight, "_run_version", lambda b: (True, "1.2.3"))
ok, reason = preflight.check(force=True)
assert ok is True
def test_cache_does_not_recheck_within_ttl(self, monkeypatch, tmp_path):
fake_bin = tmp_path / "claude"
fake_bin.write_text("x")
monkeypatch.setattr(preflight, "_claude_bin", lambda: str(fake_bin))
monkeypatch.setattr(db.settings, "preflight_cache_ttl", 999)
calls = {"n": 0}
def counting_version(b):
calls["n"] += 1
return True, "ok"
monkeypatch.setattr(preflight, "_run_version", counting_version)
preflight.reset_cache()
preflight.check() # first -> runs version
preflight.check() # cached -> no extra version call
preflight.check()
assert calls["n"] == 1
def test_force_bypasses_cache(self, monkeypatch, tmp_path):
fake_bin = tmp_path / "claude"
fake_bin.write_text("x")
monkeypatch.setattr(preflight, "_claude_bin", lambda: str(fake_bin))
calls = {"n": 0}
monkeypatch.setattr(preflight, "_run_version",
lambda b: (calls.__setitem__("n", calls["n"] + 1), (True, "ok"))[1])
preflight.reset_cache()
preflight.check()
preflight.check(force=True)
assert calls["n"] == 2
def test_worker_does_not_claim_when_preflight_fails(self, monkeypatch):
# Preflight FAIL -> job stays queued, launch_job never called.
monkeypatch.setattr("src.queue_worker.preflight.check",
lambda *a, **k: (False, "down"))
called = {"launch": False}
monkeypatch.setattr("src.queue_worker.launcher.launch_job",
lambda job: called.__setitem__("launch", True))
jid = enqueue_job("analyst", "r")
QueueWorker(max_concurrency=1, poll_interval=0.01)._drain_once()
assert called["launch"] is False
assert get_job(jid)["status"] == "queued"
assert count_running_jobs() == 0
# ---------------------------------------------------------------------------
# B. Error classifier
# ---------------------------------------------------------------------------
class TestClassifier:
@pytest.mark.parametrize("text", [
"Error: 429 Too Many Requests",
"anthropic rate limit exceeded",
"overloaded_error: server is overloaded",
"API quota exhausted",
"503 Service Unavailable",
"connection reset by peer",
])
def test_transient_patterns(self, text):
assert classify_text(text) == "transient"
@pytest.mark.parametrize("text", [
"Traceback: KeyError 'foo'",
"SyntaxError: invalid syntax",
"assertion failed in test",
"",
])
def test_permanent_patterns(self, text):
assert classify_text(text) == "permanent"
def test_retry_after_header(self):
assert parse_retry_after("HTTP/1.1 429\nRetry-After: 42\n") == 42
def test_retry_after_json(self):
assert parse_retry_after('{"error":{"type":"rate_limit","retry_after": 7}}') == 7
def test_retry_after_absent(self):
assert parse_retry_after("just an error") is None
def test_classify_log_file(self, tmp_path):
p = tmp_path / "run.log"
p.write_text("...lots of output...\n429 rate limit. Retry-After: 30\n")
kind, ra = classify_log_file(str(p))
assert kind == "transient"
assert ra == 30
def test_classify_missing_file_is_permanent(self):
kind, ra = classify_log_file("/no/such/log")
assert kind == "permanent"
assert ra is None
# ---------------------------------------------------------------------------
# C. Backoff + available_at gating
# ---------------------------------------------------------------------------
class TestBackoff:
def test_backoff_grows_exponentially(self):
lr = AgentLauncher()
# base=10, cap=600 (defaults)
b1 = lr._backoff_seconds(1)
b2 = lr._backoff_seconds(2)
b3 = lr._backoff_seconds(3)
assert b1 == 20 # 2^1*10
assert b2 == 40 # 2^2*10
assert b3 == 80 # 2^3*10
assert b2 > b1 and b3 > b2
def test_backoff_capped(self):
lr = AgentLauncher()
assert lr._backoff_seconds(20) == 600 # capped at backoff_max_seconds
def test_retry_after_respected_when_larger(self):
lr = AgentLauncher()
# transient_attempts=1 -> base backoff 20; Retry-After=120 wins.
assert lr._backoff_seconds(1, retry_after=120) == 120
def test_retry_after_ignored_when_smaller(self):
lr = AgentLauncher()
assert lr._backoff_seconds(3, retry_after=5) == 80 # backoff bigger
def test_transient_requeue_sets_future_available_at_and_claim_skips(self):
jid = enqueue_job("developer", "r")
claim_next_job()
# Big backoff -> available_at far in the future.
mark_job_transient(jid, 3600, error="429")
job = get_job(jid)
assert job["status"] == "queued"
assert job["transient_attempts"] == 1
assert job["available_at"] is not None
# claim must NOT pick it up while available_at is in the future.
assert claim_next_job() is None
def test_transient_requeue_claimable_when_due(self):
jid = enqueue_job("developer", "r")
claim_next_job()
mark_job_transient(jid, -5, error="429") # available_at in the past
c = claim_next_job()
assert c is not None and c["id"] == jid
# ---------------------------------------------------------------------------
# D. Launcher transient/permanent finalize (no Popen)
# ---------------------------------------------------------------------------
class TestFinalizeClassified:
def test_transient_failure_backoff_requeue(self, tmp_path, monkeypatch):
monkeypatch.setattr("src.notifications.send_telegram", lambda *a, **k: None)
log = tmp_path / "1.log"
log.write_text("Error 429 rate limit exceeded\n")
jid = enqueue_job("developer", "r", max_attempts=2)
claim_next_job()
AgentLauncher()._finalize_job(jid, "developer", run_id=1, exit_code=1,
output_path=str(log))
job = get_job(jid)
assert job["status"] == "queued"
assert job["transient_attempts"] == 1
assert job["available_at"] is not None # backoff-gated
assert job["attempts"] == 1 # code-fault budget NOT burned
def test_permanent_failure_uses_normal_attempts(self, tmp_path, monkeypatch):
monkeypatch.setattr("src.notifications.send_telegram", lambda *a, **k: None)
log = tmp_path / "2.log"
log.write_text("Traceback: ValueError\n")
jid = enqueue_job("developer", "r", max_attempts=2)
claim_next_job()
AgentLauncher()._finalize_job(jid, "developer", run_id=2, exit_code=1,
output_path=str(log))
job = get_job(jid)
assert job["status"] == "queued"
assert job["transient_attempts"] == 0 # not transient
assert job["available_at"] is None # no backoff for code-fault
def test_transient_exhausts_to_failed(self, tmp_path, monkeypatch):
monkeypatch.setattr("src.notifications.send_telegram", lambda *a, **k: None)
monkeypatch.setattr(db.settings, "transient_max_attempts", 2)
log = tmp_path / "3.log"
log.write_text("overloaded_error\n")
lr = AgentLauncher()
jid = enqueue_job("developer", "r")
claim_next_job()
lr._finalize_job(jid, "developer", 1, exit_code=1, output_path=str(log))
assert get_job(jid)["status"] == "queued" # transient 1 -> requeue
# force claimable and retry
mark_job_transient(jid, -1) # makes it due; transient=2 now
claim_next_job()
lr._finalize_job(jid, "developer", 2, exit_code=1, output_path=str(log))
assert get_job(jid)["status"] == "failed" # transient budget exhausted
# ---------------------------------------------------------------------------
# E. Circuit breaker
# ---------------------------------------------------------------------------
class TestCircuitBreaker:
def test_opens_after_threshold(self):
cb = CircuitBreaker(threshold=3, pause_seconds=300)
assert cb.allow_claim() is True
cb.record_transient()
cb.record_transient()
assert cb.state == "closed"
cb.record_transient() # 3rd -> open
assert cb.state == "open"
assert cb.allow_claim() is False # paused, no CLI calls
def test_recovered_resets_streak(self):
cb = CircuitBreaker(threshold=3)
cb.record_transient()
cb.record_transient()
cb.record_recovered()
assert cb.consecutive_transient == 0
assert cb.state == "closed"
def test_half_open_after_pause_then_closed_on_success(self, monkeypatch):
cb = CircuitBreaker(threshold=2, pause_seconds=300)
cb.record_transient()
cb.record_transient() # open
assert cb.state == "open"
# Simulate the pause elapsing.
cb.opened_at -= 301
assert cb.allow_claim() is True # -> half-open (probe)
assert cb.state == "half-open"
cb.record_recovered() # probe succeeded
assert cb.state == "closed"
def test_half_open_reopens_on_transient(self):
cb = CircuitBreaker(threshold=2, pause_seconds=300)
cb.record_transient(); cb.record_transient() # open
cb.opened_at -= 301
cb.allow_claim() # half-open
assert cb.state == "half-open"
cb.record_transient() # probe failed -> re-open
assert cb.state == "open"
def test_breaker_blocks_worker_claim(self, monkeypatch):
monkeypatch.setattr("src.queue_worker.preflight.check",
lambda *a, **k: (True, "ok"))
called = {"launch": False}
monkeypatch.setattr("src.queue_worker.launcher.launch_job",
lambda job: called.__setitem__("launch", True))
cb = CircuitBreaker(threshold=1, pause_seconds=300)
cb.record_transient() # open immediately
w = QueueWorker(max_concurrency=1, poll_interval=0.01, breaker=cb)
enqueue_job("analyst", "r")
w._drain_once()
assert called["launch"] is False # breaker open -> no claim, no CLI