Files
wiki/tasks/multi-agent/DEV_TASK_ORCHESTRATOR_MVP.md
2026-05-19 16:00:16 +03:00

652 lines
20 KiB
Markdown
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# DEV TASK: Orchestrator MVP
**Статус:** Ready for dev
**Проект:** multi-agent
**Фаза:** 2
**BRD:** tasks/multi-agent/BRD.md
---
## Цель
> FastAPI-сервис, принимающий webhooks от Plane и Gitea, проверяющий Quality Gates и запускающий Claude Code CLI агентов на mva154.
## Архитектура
Orchestrator — stateless FastAPI-приложение. Получает webhook-события, определяет какой агент должен запуститься, проверяет QG-условия, запускает `claude` CLI в headless-режиме. Состояние хранится в SQLite (журнал событий + текущие задачи). Деплой — Docker на mva154.
## Стек / Зависимости
- Python 3.12 + FastAPI + uvicorn
- SQLite (журнал событий)
- httpx (вызовы Plane/Gitea API)
- subprocess (запуск Claude CLI)
---
## Инфраструктура
| Параметр | Значение |
|----------|----------|
| Сервер | `slin@82.22.50.71` (mva154) |
| Рабочая директория | `/home/slin/repos/orchestrator/` |
| Gitea repo | `admin/agent-dev` (переименовать или создать `admin/orchestrator`) |
| Деплой | `docker compose up -d` |
| URL | `https://openclaw.mva154.duckdns.org/orchestrator/` |
| Порт контейнера | 8500 |
---
## Файловая карта
| Действие | Файл | Ответственность |
|----------|------|-----------------|
| Создать | `src/main.py` | FastAPI app, роутеры |
| Создать | `src/webhooks/plane.py` | Обработка Plane webhook events |
| Создать | `src/webhooks/gitea.py` | Обработка Gitea webhook events |
| Создать | `src/agents/launcher.py` | Запуск Claude CLI агентов |
| Создать | `src/qg/checks.py` | Quality Gate проверки |
| Создать | `src/db.py` | SQLite: журнал событий, задачи |
| Создать | `src/config.py` | Конфигурация из env |
| Создать | `Dockerfile` | Python 3.12-slim + deps |
| Создать | `docker-compose.yml` | Сервис + volume для SQLite |
| Создать | `requirements.txt` | Зависимости |
| Создать | `.env.example` | Шаблон переменных |
| Создать | `tests/test_webhooks.py` | Тесты webhook-обработки |
| Создать | `README.md` | Документация |
---
## Задачи
### Task 1: Скелет проекта + Health endpoint
**Файлы:**
- Создать: `src/main.py`, `src/config.py`, `src/db.py`
- Создать: `requirements.txt`, `Dockerfile`, `docker-compose.yml`, `.env.example`
- Создать: `README.md`
**Шаги:**
- [ ] **1.1** Создать структуру проекта
```
orchestrator/
├── src/
│ ├── __init__.py
│ ├── main.py # FastAPI app
│ ├── config.py # Settings from env
│ ├── db.py # SQLite init + helpers
│ ├── webhooks/
│ │ ├── __init__.py
│ │ ├── plane.py
│ │ └── gitea.py
│ ├── agents/
│ │ ├── __init__.py
│ │ └── launcher.py
│ └── qg/
│ ├── __init__.py
│ └── checks.py
├── tests/
│ ├── __init__.py
│ └── test_webhooks.py
├── data/ # SQLite DB (volume mount)
├── requirements.txt
├── Dockerfile
├── docker-compose.yml
├── .env.example
└── README.md
```
- [ ] **1.2** `src/config.py` — Pydantic Settings
```python
from pydantic_settings import BaseSettings
class Settings(BaseSettings):
# Plane
plane_api_url: str = "http://localhost:8091"
plane_api_token: str = ""
plane_workspace_slug: str = ""
plane_webhook_secret: str = ""
# Gitea
gitea_url: str = "http://localhost:3000"
gitea_token: str = ""
gitea_webhook_secret: str = ""
# Claude CLI
claude_bin: str = "/usr/bin/claude"
repos_dir: str = "/home/slin/repos"
# DB
db_path: str = "/app/data/orchestrator.db"
class Config:
env_prefix = "ORCH_"
env_file = ".env"
settings = Settings()
```
- [ ] **1.3** `src/db.py` — SQLite schema
```python
import sqlite3
from .config import settings
def get_db():
conn = sqlite3.connect(settings.db_path)
conn.row_factory = sqlite3.Row
return conn
def init_db():
conn = get_db()
conn.executescript("""
CREATE TABLE IF NOT EXISTS events (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp TEXT DEFAULT (datetime('now')),
source TEXT NOT NULL, -- 'plane' | 'gitea'
event_type TEXT NOT NULL,
payload TEXT NOT NULL,
processed INTEGER DEFAULT 0
);
CREATE TABLE IF NOT EXISTS tasks (
id INTEGER PRIMARY KEY AUTOINCREMENT,
plane_id TEXT,
repo TEXT NOT NULL,
branch TEXT,
stage TEXT DEFAULT 'created',
agent_running TEXT,
created_at TEXT DEFAULT (datetime('now')),
updated_at TEXT DEFAULT (datetime('now'))
);
CREATE TABLE IF NOT EXISTS agent_runs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
task_id INTEGER REFERENCES tasks(id),
agent TEXT NOT NULL,
started_at TEXT DEFAULT (datetime('now')),
finished_at TEXT,
exit_code INTEGER,
output_path TEXT
);
""")
conn.close()
```
- [ ] **1.4** `src/main.py` — FastAPI app
```python
from fastapi import FastAPI
from contextlib import asynccontextmanager
from .db import init_db
from .webhooks.plane import router as plane_router
from .webhooks.gitea import router as gitea_router
@asynccontextmanager
async def lifespan(app: FastAPI):
init_db()
yield
app = FastAPI(title="Multi-Agent Orchestrator", lifespan=lifespan)
app.include_router(plane_router, prefix="/webhook")
app.include_router(gitea_router, prefix="/webhook")
@app.get("/health")
async def health():
return {"status": "ok", "service": "orchestrator"}
@app.get("/status")
async def status():
from .db import get_db
conn = get_db()
tasks = conn.execute("SELECT * FROM tasks WHERE stage != 'done' ORDER BY created_at DESC LIMIT 10").fetchall()
conn.close()
return {"active_tasks": [dict(t) for t in tasks]}
```
- [ ] **1.5** `requirements.txt`
```
fastapi==0.115.0
uvicorn[standard]==0.30.0
pydantic-settings==2.5.0
httpx==0.27.0
```
- [ ] **1.6** `Dockerfile`
```dockerfile
FROM python:3.12-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY src/ src/
RUN mkdir -p /app/data
CMD ["uvicorn", "src.main:app", "--host", "0.0.0.0", "--port", "8500"]
```
- [ ] **1.7** `docker-compose.yml`
```yaml
services:
orchestrator:
build: .
container_name: orchestrator
restart: unless-stopped
ports:
- "127.0.0.1:8500:8500"
volumes:
- ./data:/app/data
- /home/slin/repos:/repos:ro
env_file: .env
environment:
- ORCH_REPOS_DIR=/repos
```
- [ ] **1.8** `.env.example`
```
ORCH_PLANE_API_URL=http://plane-app-api-1:8000
ORCH_PLANE_API_TOKEN=
ORCH_PLANE_WORKSPACE_SLUG=
ORCH_PLANE_WEBHOOK_SECRET=
ORCH_GITEA_URL=http://localhost:3000
ORCH_GITEA_TOKEN=
ORCH_GITEA_WEBHOOK_SECRET=
ORCH_CLAUDE_BIN=/usr/bin/claude
ORCH_REPOS_DIR=/home/slin/repos
ORCH_DB_PATH=/app/data/orchestrator.db
```
**Критерий готовности:** `docker compose up -d``curl localhost:8500/health``{"status": "ok"}`
---
### Task 2: Webhook handlers (Plane + Gitea)
**Файлы:**
- Создать: `src/webhooks/plane.py`, `src/webhooks/gitea.py`
**Шаги:**
- [ ] **2.1** `src/webhooks/plane.py`
```python
from fastapi import APIRouter, Request, HTTPException
import json
from ..db import get_db
from ..config import settings
router = APIRouter()
@router.post("/plane")
async def plane_webhook(request: Request):
"""Handle Plane webhook events."""
body = await request.body()
payload = json.loads(body)
# Log event
conn = get_db()
conn.execute(
"INSERT INTO events (source, event_type, payload) VALUES (?, ?, ?)",
("plane", payload.get("event", "unknown"), body.decode())
)
conn.commit()
event = payload.get("event")
data = payload.get("data", {})
if event == "work_item.created":
await handle_work_item_created(data, conn)
elif event == "comment.created":
await handle_comment(data, conn)
conn.close()
return {"status": "accepted"}
async def handle_work_item_created(data: dict, conn):
"""New work item → create branch + start Analyst."""
plane_id = data.get("id", "")
name = data.get("name", "")
project_id = data.get("project", "")
# Create task record
conn.execute(
"INSERT INTO tasks (plane_id, repo, stage) VALUES (?, ?, ?)",
(plane_id, "enduro-trails", "analysis")
)
conn.commit()
# TODO: Create git branch
# TODO: Launch Analyst agent
async def handle_comment(data: dict, conn):
"""Check for :approved: reaction → advance stage."""
comment_body = data.get("comment", "")
if ":approved:" in comment_body:
# TODO: Determine which task, advance QG
pass
```
- [ ] **2.2** `src/webhooks/gitea.py`
```python
from fastapi import APIRouter, Request
import json
from ..db import get_db
router = APIRouter()
@router.post("/gitea")
async def gitea_webhook(request: Request):
"""Handle Gitea webhook events."""
body = await request.body()
payload = json.loads(body)
# Log event
conn = get_db()
event_type = request.headers.get("X-Gitea-Event", "unknown")
conn.execute(
"INSERT INTO events (source, event_type, payload) VALUES (?, ?, ?)",
("gitea", event_type, body.decode())
)
conn.commit()
if event_type == "push":
await handle_push(payload, conn)
elif event_type == "pull_request":
await handle_pr(payload, conn)
elif event_type == "status":
await handle_ci_status(payload, conn)
conn.close()
return {"status": "accepted"}
async def handle_push(payload: dict, conn):
"""Push event — check if CI should trigger next stage."""
ref = payload.get("ref", "")
repo = payload.get("repository", {}).get("name", "")
# Log for now
pass
async def handle_pr(payload: dict, conn):
"""PR event — check reviews, CI status."""
action = payload.get("action", "")
pr = payload.get("pull_request", {})
if action == "reviewed" and pr.get("state") == "approved":
# TODO: QG-5 check → launch Tester
pass
async def handle_ci_status(payload: dict, conn):
"""CI status update — check if all green → advance."""
state = payload.get("state", "")
context = payload.get("context", "")
sha = payload.get("sha", "")
if state == "success":
# TODO: Check all required contexts green → advance stage
pass
```
**Критерий готовности:** POST to `/webhook/plane` и `/webhook/gitea` → 200, events записываются в SQLite
---
### Task 3: Agent Launcher
**Файлы:**
- Создать: `src/agents/launcher.py`
**Шаги:**
- [ ] **3.1** `src/agents/launcher.py`
```python
import subprocess
import os
import json
from datetime import datetime
from ..config import settings
from ..db import get_db
class AgentLauncher:
"""Launch Claude CLI agents for specific tasks."""
AGENT_CONFIGS = {
"analyst": {
"system_prompt": ".openclaw/agents/analyst.md",
"task_file": ".task.md",
"allowed_tools": "Read,Write,Edit,Bash",
},
"architect": {
"system_prompt": ".openclaw/agents/architect.md",
"task_file": ".task-arch.md",
"allowed_tools": "Read,Write,Edit,Bash",
},
"developer": {
"system_prompt": ".openclaw/agents/developer.md",
"task_file": ".task-dev.md",
"allowed_tools": "Read,Write,Edit,Bash",
},
"reviewer": {
"system_prompt": ".openclaw/agents/reviewer.md",
"task_file": ".task-review.md",
"allowed_tools": "Read,Write,Edit,Bash",
},
"tester": {
"system_prompt": ".openclaw/agents/tester.md",
"task_file": ".task-test.md",
"allowed_tools": "Read,Write,Edit,Bash",
},
}
def launch(self, agent: str, repo: str, task_content: str = None) -> int:
"""
Launch a Claude CLI agent.
Args:
agent: Agent role (analyst, architect, developer, reviewer, tester)
repo: Repository name (e.g., 'enduro-trails')
task_content: Optional task content (if not using .task file)
Returns:
agent_run_id from DB
"""
config = self.AGENT_CONFIGS.get(agent)
if not config:
raise ValueError(f"Unknown agent: {agent}")
repo_path = os.path.join(settings.repos_dir, repo)
if not os.path.isdir(repo_path):
raise FileNotFoundError(f"Repo not found: {repo_path}")
# Write task file if content provided
if task_content:
task_path = os.path.join(repo_path, config["task_file"])
with open(task_path, "w") as f:
f.write(task_content)
# Record run in DB
conn = get_db()
cursor = conn.execute(
"INSERT INTO agent_runs (task_id, agent) VALUES (NULL, ?)",
(agent,)
)
run_id = cursor.lastrowid
conn.commit()
# Build command
system_prompt_path = os.path.join(repo_path, config["system_prompt"])
task_file_path = os.path.join(repo_path, config["task_file"])
output_path = f"/app/data/runs/{run_id}.log"
os.makedirs(os.path.dirname(output_path), exist_ok=True)
cmd = [
settings.claude_bin,
"--print",
f"$(cat {task_file_path})",
"--system-prompt", f"$(cat {system_prompt_path})",
"--allowedTools", config["allowed_tools"],
]
# Launch as background process
with open(output_path, "w") as log_file:
process = subprocess.Popen(
["bash", "-c", f'cd {repo_path} && {settings.claude_bin} --print "$(cat {config["task_file"]})" --system-prompt "$(cat {config["system_prompt"]})" --allowedTools {config["allowed_tools"]}'],
stdout=log_file,
stderr=subprocess.STDOUT,
cwd=repo_path,
)
# Update DB with PID
conn.execute(
"UPDATE agent_runs SET output_path = ? WHERE id = ?",
(output_path, run_id)
)
conn.commit()
conn.close()
return run_id
launcher = AgentLauncher()
```
**Критерий готовности:** `launcher.launch("analyst", "enduro-trails", "...")` запускает Claude CLI процесс, записывает в DB
---
### Task 4: Тесты + README
**Файлы:**
- Создать: `tests/test_webhooks.py`, `README.md`
**Шаги:**
- [ ] **4.1** `tests/test_webhooks.py`
```python
import pytest
from fastapi.testclient import TestClient
from src.main import app
client = TestClient(app)
def test_health():
resp = client.get("/health")
assert resp.status_code == 200
assert resp.json()["status"] == "ok"
def test_plane_webhook_accepts():
resp = client.post("/webhook/plane", json={
"event": "work_item.created",
"data": {"id": "test-123", "name": "Test task", "project": "proj-1"}
})
assert resp.status_code == 200
assert resp.json()["status"] == "accepted"
def test_gitea_webhook_accepts():
resp = client.post(
"/webhook/gitea",
json={"ref": "refs/heads/feature/test", "repository": {"name": "enduro-trails"}},
headers={"X-Gitea-Event": "push"}
)
assert resp.status_code == 200
assert resp.json()["status"] == "accepted"
def test_status_endpoint():
resp = client.get("/status")
assert resp.status_code == 200
assert "active_tasks" in resp.json()
```
- [ ] **4.2** `README.md` с описанием API, настройки, деплоя
**Критерий готовности:** `pytest tests/` → all green
---
### Task 5: Деплой на mva154
**Шаги:**
- [ ] **5.1** Инициализировать git-репо, push в Gitea `admin/orchestrator` (или использовать `admin/agent-dev`)
```bash
cd /home/slin/repos/orchestrator
git init
git add .
git commit -m "feat: orchestrator MVP — webhooks, agent launcher, QG checks"
git remote add origin http://localhost:3000/admin/agent-dev.git
git push -u origin main
```
- [ ] **5.2** Создать `.env` из `.env.example`, заполнить токены
- [ ] **5.3** `docker compose up -d --build`
- [ ] **5.4** Добавить Nginx location
```nginx
location /orchestrator/ {
proxy_pass http://127.0.0.1:8500/;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
}
```
- [ ] **5.5** Настроить webhook в Gitea: Settings → Webhooks → `http://localhost:8500/webhook/gitea`
- [ ] **5.6** Проверить
```bash
curl -s https://openclaw.mva154.duckdns.org/orchestrator/health
# {"status": "ok", "service": "orchestrator"}
```
**Критерий готовности:** Orchestrator доступен по URL, принимает webhooks, записывает в DB
---
## Проверка (Acceptance)
| # | Проверка | Команда / Действие | Ожидаемый результат |
|---|----------|-------------------|---------------------|
| 1 | Health endpoint | `curl .../orchestrator/health` | `{"status": "ok"}` |
| 2 | Plane webhook | POST JSON → `/webhook/plane` | 200, event в SQLite |
| 3 | Gitea webhook | Push в feature-ветку | Event в SQLite |
| 4 | Status endpoint | `curl .../orchestrator/status` | JSON с active_tasks |
| 5 | Agent launch | POST test task | Claude CLI запускается |
| 6 | Tests pass | `pytest tests/` | All green |
---
## Ограничения и контекст
- ⚠️ Claude CLI на mva154 — `/usr/bin/claude`, auth через `claude.ai` (Max subscription)
- ⚠️ Gitea доступен по `http://localhost:3000` (не по внешнему домену — DNS нестабилен)
- ⚠️ Plane API — порт 8091 (proxy) или напрямую к API-контейнеру `172.21.0.6:8000`
- ⚠️ Repos лежат в `/home/slin/repos/` — монтировать как volume read-only
- ⚠️ Orchestrator НЕ должен сам мержить PR или деплоить — только запускать агентов
- 🚫 НЕ использовать Docker-in-Docker для runner'а
- 🚫 НЕ хардкодить токены — только через .env
---
## Деплой-чеклист
- [ ] Код написан и тесты проходят
- [ ] Docker image собирается
- [ ] `.env` заполнен
- [ ] `docker compose up -d` — контейнер running
- [ ] Nginx location добавлен, `nginx -t && systemctl reload nginx`
- [ ] Health endpoint отвечает
- [ ] Gitea webhook настроен и доставляется
- [ ] Нет ошибок в `docker logs orchestrator --tail 50`
---
*Создано: 2026-05-19 | Автор ТЗ: Стрим | Исполнитель: Dev-агент*