feat: orchestrator MVP — webhooks, agent launcher, QG checks
This commit is contained in:
10
.env
Normal file
10
.env
Normal file
@@ -0,0 +1,10 @@
|
||||
ORCH_PLANE_API_URL=http://plane-app-api-1:8000
|
||||
ORCH_PLANE_API_TOKEN=
|
||||
ORCH_PLANE_WORKSPACE_SLUG=
|
||||
ORCH_PLANE_WEBHOOK_SECRET=
|
||||
ORCH_GITEA_URL=http://localhost:3000
|
||||
ORCH_GITEA_TOKEN=c81227b0dee2217f9ab3d28c3642a4578a1b9772
|
||||
ORCH_GITEA_WEBHOOK_SECRET=
|
||||
ORCH_CLAUDE_BIN=/usr/bin/claude
|
||||
ORCH_REPOS_DIR=/home/slin/repos
|
||||
ORCH_DB_PATH=/app/data/orchestrator.db
|
||||
10
.env.example
Normal file
10
.env.example
Normal file
@@ -0,0 +1,10 @@
|
||||
ORCH_PLANE_API_URL=http://plane-app-api-1:8000
|
||||
ORCH_PLANE_API_TOKEN=
|
||||
ORCH_PLANE_WORKSPACE_SLUG=
|
||||
ORCH_PLANE_WEBHOOK_SECRET=
|
||||
ORCH_GITEA_URL=http://localhost:3000
|
||||
ORCH_GITEA_TOKEN=
|
||||
ORCH_GITEA_WEBHOOK_SECRET=
|
||||
ORCH_CLAUDE_BIN=/usr/bin/claude
|
||||
ORCH_REPOS_DIR=/home/slin/repos
|
||||
ORCH_DB_PATH=/app/data/orchestrator.db
|
||||
7
Dockerfile
Normal file
7
Dockerfile
Normal file
@@ -0,0 +1,7 @@
|
||||
FROM python:3.12-slim
|
||||
WORKDIR /app
|
||||
COPY requirements.txt .
|
||||
RUN pip install --no-cache-dir -r requirements.txt
|
||||
COPY src/ src/
|
||||
RUN mkdir -p /app/data/runs
|
||||
CMD ["uvicorn", "src.main:app", "--host", "0.0.0.0", "--port", "8500"]
|
||||
70
README.md
Normal file
70
README.md
Normal file
@@ -0,0 +1,70 @@
|
||||
# Multi-Agent Orchestrator
|
||||
|
||||
FastAPI-сервис для оркестрации мульти-агентного пайплайна разработки.
|
||||
|
||||
## Что делает
|
||||
|
||||
- Принимает webhooks от **Plane** (task management) и **Gitea** (git events)
|
||||
- Проверяет Quality Gates перед переходом между стадиями
|
||||
- Запускает **Claude CLI** агентов (analyst, architect, developer, reviewer, tester)
|
||||
- Ведёт журнал событий в SQLite
|
||||
|
||||
## API Endpoints
|
||||
|
||||
| Method | Path | Описание |
|
||||
|--------|------|----------|
|
||||
| GET | `/health` | Health check |
|
||||
| GET | `/status` | Активные задачи |
|
||||
| POST | `/webhook/plane` | Plane webhook receiver |
|
||||
| POST | `/webhook/gitea` | Gitea webhook receiver |
|
||||
|
||||
## Настройка
|
||||
|
||||
```bash
|
||||
cp .env.example .env
|
||||
# Заполнить токены в .env
|
||||
```
|
||||
|
||||
## Запуск (Docker)
|
||||
|
||||
```bash
|
||||
docker compose up -d --build
|
||||
```
|
||||
|
||||
## Запуск (dev)
|
||||
|
||||
```bash
|
||||
pip install -r requirements.txt
|
||||
uvicorn src.main:app --reload --port 8500
|
||||
```
|
||||
|
||||
## Тесты
|
||||
|
||||
```bash
|
||||
pip install pytest
|
||||
pytest tests/ -v
|
||||
```
|
||||
|
||||
## Переменные окружения
|
||||
|
||||
| Переменная | Описание | Default |
|
||||
|-----------|----------|---------|
|
||||
| `ORCH_PLANE_API_URL` | Plane API URL | `http://localhost:8091` |
|
||||
| `ORCH_PLANE_API_TOKEN` | Plane API token | — |
|
||||
| `ORCH_PLANE_WEBHOOK_SECRET` | Webhook secret для верификации | — |
|
||||
| `ORCH_GITEA_URL` | Gitea URL | `http://localhost:3000` |
|
||||
| `ORCH_GITEA_TOKEN` | Gitea API token | — |
|
||||
| `ORCH_GITEA_WEBHOOK_SECRET` | Gitea webhook secret | — |
|
||||
| `ORCH_CLAUDE_BIN` | Путь к Claude CLI | `/usr/bin/claude` |
|
||||
| `ORCH_REPOS_DIR` | Директория с репозиториями | `/home/slin/repos` |
|
||||
| `ORCH_DB_PATH` | Путь к SQLite БД | `/app/data/orchestrator.db` |
|
||||
|
||||
## Архитектура
|
||||
|
||||
```
|
||||
Plane webhook ──┐
|
||||
├──► Orchestrator ──► Quality Gates ──► Agent Launcher ──► Claude CLI
|
||||
Gitea webhook ──┘ │
|
||||
▼
|
||||
SQLite (events, tasks, agent_runs)
|
||||
```
|
||||
13
docker-compose.yml
Normal file
13
docker-compose.yml
Normal file
@@ -0,0 +1,13 @@
|
||||
services:
|
||||
orchestrator:
|
||||
build: .
|
||||
container_name: orchestrator
|
||||
restart: unless-stopped
|
||||
ports:
|
||||
- "127.0.0.1:8500:8500"
|
||||
volumes:
|
||||
- ./data:/app/data
|
||||
- /home/slin/repos:/repos:ro
|
||||
env_file: .env
|
||||
environment:
|
||||
- ORCH_REPOS_DIR=/repos
|
||||
4
requirements.txt
Normal file
4
requirements.txt
Normal file
@@ -0,0 +1,4 @@
|
||||
fastapi==0.115.0
|
||||
uvicorn[standard]==0.30.0
|
||||
pydantic-settings==2.5.0
|
||||
httpx==0.27.0
|
||||
0
src/__init__.py
Normal file
0
src/__init__.py
Normal file
0
src/agents/__init__.py
Normal file
0
src/agents/__init__.py
Normal file
105
src/agents/launcher.py
Normal file
105
src/agents/launcher.py
Normal file
@@ -0,0 +1,105 @@
|
||||
import subprocess
|
||||
import os
|
||||
from ..config import settings
|
||||
from ..db import get_db
|
||||
|
||||
|
||||
class AgentLauncher:
|
||||
"""Launch Claude CLI agents for specific tasks."""
|
||||
|
||||
AGENT_CONFIGS = {
|
||||
"analyst": {
|
||||
"system_prompt": ".openclaw/agents/analyst.md",
|
||||
"task_file": ".task.md",
|
||||
"allowed_tools": "Read,Write,Edit,Bash",
|
||||
},
|
||||
"architect": {
|
||||
"system_prompt": ".openclaw/agents/architect.md",
|
||||
"task_file": ".task-arch.md",
|
||||
"allowed_tools": "Read,Write,Edit,Bash",
|
||||
},
|
||||
"developer": {
|
||||
"system_prompt": ".openclaw/agents/developer.md",
|
||||
"task_file": ".task-dev.md",
|
||||
"allowed_tools": "Read,Write,Edit,Bash",
|
||||
},
|
||||
"reviewer": {
|
||||
"system_prompt": ".openclaw/agents/reviewer.md",
|
||||
"task_file": ".task-review.md",
|
||||
"allowed_tools": "Read,Write,Edit,Bash",
|
||||
},
|
||||
"tester": {
|
||||
"system_prompt": ".openclaw/agents/tester.md",
|
||||
"task_file": ".task-test.md",
|
||||
"allowed_tools": "Read,Write,Edit,Bash",
|
||||
},
|
||||
}
|
||||
|
||||
def launch(self, agent: str, repo: str, task_content: str = None) -> int:
|
||||
"""
|
||||
Launch a Claude CLI agent.
|
||||
|
||||
Args:
|
||||
agent: Agent role (analyst, architect, developer, reviewer, tester)
|
||||
repo: Repository name
|
||||
task_content: Optional task content to write to task file
|
||||
|
||||
Returns:
|
||||
agent_run_id from DB
|
||||
"""
|
||||
config = self.AGENT_CONFIGS.get(agent)
|
||||
if not config:
|
||||
raise ValueError(f"Unknown agent: {agent}")
|
||||
|
||||
repo_path = os.path.join(settings.repos_dir, repo)
|
||||
if not os.path.isdir(repo_path):
|
||||
raise FileNotFoundError(f"Repo not found: {repo_path}")
|
||||
|
||||
# Write task file if content provided
|
||||
if task_content:
|
||||
task_path = os.path.join(repo_path, config["task_file"])
|
||||
with open(task_path, "w") as f:
|
||||
f.write(task_content)
|
||||
|
||||
# Record run in DB
|
||||
conn = get_db()
|
||||
cursor = conn.execute(
|
||||
"INSERT INTO agent_runs (task_id, agent) VALUES (NULL, ?)",
|
||||
(agent,),
|
||||
)
|
||||
run_id = cursor.lastrowid
|
||||
conn.commit()
|
||||
|
||||
# Prepare output log
|
||||
output_path = f"/app/data/runs/{run_id}.log"
|
||||
os.makedirs(os.path.dirname(output_path), exist_ok=True)
|
||||
|
||||
# Build shell command
|
||||
cmd = (
|
||||
f'cd {repo_path} && {settings.claude_bin} --print '
|
||||
f'"$(cat {config["task_file"]})" '
|
||||
f'--system-prompt "$(cat {config["system_prompt"]})" '
|
||||
f'--allowedTools {config["allowed_tools"]}'
|
||||
)
|
||||
|
||||
# Launch as background process
|
||||
with open(output_path, "w") as log_file:
|
||||
subprocess.Popen(
|
||||
["bash", "-c", cmd],
|
||||
stdout=log_file,
|
||||
stderr=subprocess.STDOUT,
|
||||
cwd=repo_path,
|
||||
)
|
||||
|
||||
# Update DB with output path
|
||||
conn.execute(
|
||||
"UPDATE agent_runs SET output_path = ? WHERE id = ?",
|
||||
(output_path, run_id),
|
||||
)
|
||||
conn.commit()
|
||||
conn.close()
|
||||
|
||||
return run_id
|
||||
|
||||
|
||||
launcher = AgentLauncher()
|
||||
28
src/config.py
Normal file
28
src/config.py
Normal file
@@ -0,0 +1,28 @@
|
||||
from pydantic_settings import BaseSettings
|
||||
|
||||
|
||||
class Settings(BaseSettings):
|
||||
# Plane
|
||||
plane_api_url: str = "http://localhost:8091"
|
||||
plane_api_token: str = ""
|
||||
plane_workspace_slug: str = ""
|
||||
plane_webhook_secret: str = ""
|
||||
|
||||
# Gitea
|
||||
gitea_url: str = "http://localhost:3000"
|
||||
gitea_token: str = ""
|
||||
gitea_webhook_secret: str = ""
|
||||
|
||||
# Claude CLI
|
||||
claude_bin: str = "/usr/bin/claude"
|
||||
repos_dir: str = "/home/slin/repos"
|
||||
|
||||
# DB
|
||||
db_path: str = "/app/data/orchestrator.db"
|
||||
|
||||
class Config:
|
||||
env_prefix = "ORCH_"
|
||||
env_file = ".env"
|
||||
|
||||
|
||||
settings = Settings()
|
||||
42
src/db.py
Normal file
42
src/db.py
Normal file
@@ -0,0 +1,42 @@
|
||||
import sqlite3
|
||||
from .config import settings
|
||||
|
||||
|
||||
def get_db() -> sqlite3.Connection:
|
||||
conn = sqlite3.connect(settings.db_path)
|
||||
conn.row_factory = sqlite3.Row
|
||||
return conn
|
||||
|
||||
|
||||
def init_db():
|
||||
conn = get_db()
|
||||
conn.executescript("""
|
||||
CREATE TABLE IF NOT EXISTS events (
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
timestamp TEXT DEFAULT (datetime('now')),
|
||||
source TEXT NOT NULL,
|
||||
event_type TEXT NOT NULL,
|
||||
payload TEXT NOT NULL,
|
||||
processed INTEGER DEFAULT 0
|
||||
);
|
||||
CREATE TABLE IF NOT EXISTS tasks (
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
plane_id TEXT,
|
||||
repo TEXT NOT NULL,
|
||||
branch TEXT,
|
||||
stage TEXT DEFAULT 'created',
|
||||
agent_running TEXT,
|
||||
created_at TEXT DEFAULT (datetime('now')),
|
||||
updated_at TEXT DEFAULT (datetime('now'))
|
||||
);
|
||||
CREATE TABLE IF NOT EXISTS agent_runs (
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
task_id INTEGER REFERENCES tasks(id),
|
||||
agent TEXT NOT NULL,
|
||||
started_at TEXT DEFAULT (datetime('now')),
|
||||
finished_at TEXT,
|
||||
exit_code INTEGER,
|
||||
output_path TEXT
|
||||
);
|
||||
""")
|
||||
conn.close()
|
||||
32
src/main.py
Normal file
32
src/main.py
Normal file
@@ -0,0 +1,32 @@
|
||||
from fastapi import FastAPI
|
||||
from contextlib import asynccontextmanager
|
||||
from .db import init_db
|
||||
from .webhooks.plane import router as plane_router
|
||||
from .webhooks.gitea import router as gitea_router
|
||||
|
||||
|
||||
@asynccontextmanager
|
||||
async def lifespan(app: FastAPI):
|
||||
init_db()
|
||||
yield
|
||||
|
||||
|
||||
app = FastAPI(title="Multi-Agent Orchestrator", lifespan=lifespan)
|
||||
app.include_router(plane_router, prefix="/webhook")
|
||||
app.include_router(gitea_router, prefix="/webhook")
|
||||
|
||||
|
||||
@app.get("/health")
|
||||
async def health():
|
||||
return {"status": "ok", "service": "orchestrator"}
|
||||
|
||||
|
||||
@app.get("/status")
|
||||
async def status():
|
||||
from .db import get_db
|
||||
conn = get_db()
|
||||
tasks = conn.execute(
|
||||
"SELECT * FROM tasks WHERE stage != 'done' ORDER BY created_at DESC LIMIT 10"
|
||||
).fetchall()
|
||||
conn.close()
|
||||
return {"active_tasks": [dict(t) for t in tasks]}
|
||||
0
src/qg/__init__.py
Normal file
0
src/qg/__init__.py
Normal file
26
src/qg/checks.py
Normal file
26
src/qg/checks.py
Normal file
@@ -0,0 +1,26 @@
|
||||
# Quality Gate checks placeholder
|
||||
# Will be expanded as pipeline matures
|
||||
|
||||
|
||||
def check_analysis_complete(task_id: int) -> bool:
|
||||
"""Check if analysis artifacts exist."""
|
||||
# TODO: verify .task-arch.md exists in repo
|
||||
return True
|
||||
|
||||
|
||||
def check_architecture_approved(task_id: int) -> bool:
|
||||
"""Check if architecture was approved in Plane."""
|
||||
# TODO: check Plane comment for :approved:
|
||||
return False
|
||||
|
||||
|
||||
def check_ci_green(repo: str, branch: str) -> bool:
|
||||
"""Check if CI status is green for branch."""
|
||||
# TODO: query Gitea commit status API
|
||||
return False
|
||||
|
||||
|
||||
def check_review_approved(repo: str, pr_number: int) -> bool:
|
||||
"""Check if PR has approved review."""
|
||||
# TODO: query Gitea PR reviews API
|
||||
return False
|
||||
0
src/webhooks/__init__.py
Normal file
0
src/webhooks/__init__.py
Normal file
54
src/webhooks/gitea.py
Normal file
54
src/webhooks/gitea.py
Normal file
@@ -0,0 +1,54 @@
|
||||
from fastapi import APIRouter, Request
|
||||
import json
|
||||
from ..db import get_db
|
||||
|
||||
router = APIRouter()
|
||||
|
||||
|
||||
@router.post("/gitea")
|
||||
async def gitea_webhook(request: Request):
|
||||
"""Handle Gitea webhook events."""
|
||||
body = await request.body()
|
||||
payload = json.loads(body)
|
||||
|
||||
# Log event
|
||||
conn = get_db()
|
||||
event_type = request.headers.get("X-Gitea-Event", "unknown")
|
||||
conn.execute(
|
||||
"INSERT INTO events (source, event_type, payload) VALUES (?, ?, ?)",
|
||||
("gitea", event_type, body.decode()),
|
||||
)
|
||||
conn.commit()
|
||||
|
||||
if event_type == "push":
|
||||
await handle_push(payload, conn)
|
||||
elif event_type == "pull_request":
|
||||
await handle_pr(payload, conn)
|
||||
elif event_type == "status":
|
||||
await handle_ci_status(payload, conn)
|
||||
|
||||
conn.close()
|
||||
return {"status": "accepted"}
|
||||
|
||||
|
||||
async def handle_push(payload: dict, conn):
|
||||
"""Push event — log for now."""
|
||||
pass
|
||||
|
||||
|
||||
async def handle_pr(payload: dict, conn):
|
||||
"""PR event — check reviews, CI status."""
|
||||
action = payload.get("action", "")
|
||||
pr = payload.get("pull_request", {})
|
||||
|
||||
if action == "reviewed" and pr.get("state") == "approved":
|
||||
# TODO: QG-5 check -> launch Tester
|
||||
pass
|
||||
|
||||
|
||||
async def handle_ci_status(payload: dict, conn):
|
||||
"""CI status update — check if all green -> advance."""
|
||||
state = payload.get("state", "")
|
||||
if state == "success":
|
||||
# TODO: Check all required contexts green -> advance stage
|
||||
pass
|
||||
49
src/webhooks/plane.py
Normal file
49
src/webhooks/plane.py
Normal file
@@ -0,0 +1,49 @@
|
||||
from fastapi import APIRouter, Request
|
||||
import json
|
||||
from ..db import get_db
|
||||
|
||||
router = APIRouter()
|
||||
|
||||
|
||||
@router.post("/plane")
|
||||
async def plane_webhook(request: Request):
|
||||
"""Handle Plane webhook events."""
|
||||
body = await request.body()
|
||||
payload = json.loads(body)
|
||||
|
||||
# Log event
|
||||
conn = get_db()
|
||||
conn.execute(
|
||||
"INSERT INTO events (source, event_type, payload) VALUES (?, ?, ?)",
|
||||
("plane", payload.get("event", "unknown"), body.decode()),
|
||||
)
|
||||
conn.commit()
|
||||
|
||||
event = payload.get("event")
|
||||
data = payload.get("data", {})
|
||||
|
||||
if event == "work_item.created":
|
||||
await handle_work_item_created(data, conn)
|
||||
elif event == "comment.created":
|
||||
await handle_comment(data, conn)
|
||||
|
||||
conn.close()
|
||||
return {"status": "accepted"}
|
||||
|
||||
|
||||
async def handle_work_item_created(data: dict, conn):
|
||||
"""New work item -> create task record."""
|
||||
plane_id = data.get("id", "")
|
||||
conn.execute(
|
||||
"INSERT INTO tasks (plane_id, repo, stage) VALUES (?, ?, ?)",
|
||||
(plane_id, "enduro-trails", "analysis"),
|
||||
)
|
||||
conn.commit()
|
||||
|
||||
|
||||
async def handle_comment(data: dict, conn):
|
||||
"""Check for :approved: reaction -> advance stage."""
|
||||
comment_body = data.get("comment", "")
|
||||
if ":approved:" in comment_body:
|
||||
# TODO: Determine which task, advance QG
|
||||
pass
|
||||
0
tests/__init__.py
Normal file
0
tests/__init__.py
Normal file
65
tests/test_webhooks.py
Normal file
65
tests/test_webhooks.py
Normal file
@@ -0,0 +1,65 @@
|
||||
import pytest
|
||||
from fastapi.testclient import TestClient
|
||||
import os
|
||||
import tempfile
|
||||
|
||||
# Override DB path before importing app
|
||||
os.environ["ORCH_DB_PATH"] = os.path.join(tempfile.gettempdir(), "test_orchestrator.db")
|
||||
|
||||
from src.main import app
|
||||
|
||||
client = TestClient(app)
|
||||
|
||||
|
||||
def test_health():
|
||||
resp = client.get("/health")
|
||||
assert resp.status_code == 200
|
||||
assert resp.json()["status"] == "ok"
|
||||
assert resp.json()["service"] == "orchestrator"
|
||||
|
||||
|
||||
def test_plane_webhook_accepts():
|
||||
resp = client.post("/webhook/plane", json={
|
||||
"event": "work_item.created",
|
||||
"data": {"id": "test-123", "name": "Test task", "project": "proj-1"}
|
||||
})
|
||||
assert resp.status_code == 200
|
||||
assert resp.json()["status"] == "accepted"
|
||||
|
||||
|
||||
def test_plane_webhook_comment():
|
||||
resp = client.post("/webhook/plane", json={
|
||||
"event": "comment.created",
|
||||
"data": {"comment": "LGTM :approved:"}
|
||||
})
|
||||
assert resp.status_code == 200
|
||||
assert resp.json()["status"] == "accepted"
|
||||
|
||||
|
||||
def test_gitea_webhook_push():
|
||||
resp = client.post(
|
||||
"/webhook/gitea",
|
||||
json={"ref": "refs/heads/feature/test", "repository": {"name": "enduro-trails"}},
|
||||
headers={"X-Gitea-Event": "push"}
|
||||
)
|
||||
assert resp.status_code == 200
|
||||
assert resp.json()["status"] == "accepted"
|
||||
|
||||
|
||||
def test_gitea_webhook_pr():
|
||||
resp = client.post(
|
||||
"/webhook/gitea",
|
||||
json={
|
||||
"action": "reviewed",
|
||||
"pull_request": {"state": "approved", "number": 1}
|
||||
},
|
||||
headers={"X-Gitea-Event": "pull_request"}
|
||||
)
|
||||
assert resp.status_code == 200
|
||||
assert resp.json()["status"] == "accepted"
|
||||
|
||||
|
||||
def test_status_endpoint():
|
||||
resp = client.get("/status")
|
||||
assert resp.status_code == 200
|
||||
assert "active_tasks" in resp.json()
|
||||
Reference in New Issue
Block a user