Files
wiki/tasks/multi-agent/DEV_TASK_ORCHESTRATOR_MVP.md
2026-05-19 16:00:16 +03:00

20 KiB
Raw Blame History

DEV TASK: Orchestrator MVP

Статус: Ready for dev
Проект: multi-agent
Фаза: 2
BRD: tasks/multi-agent/BRD.md


Цель

FastAPI-сервис, принимающий webhooks от Plane и Gitea, проверяющий Quality Gates и запускающий Claude Code CLI агентов на mva154.

Архитектура

Orchestrator — stateless FastAPI-приложение. Получает webhook-события, определяет какой агент должен запуститься, проверяет QG-условия, запускает claude CLI в headless-режиме. Состояние хранится в SQLite (журнал событий + текущие задачи). Деплой — Docker на mva154.

Стек / Зависимости

  • Python 3.12 + FastAPI + uvicorn
  • SQLite (журнал событий)
  • httpx (вызовы Plane/Gitea API)
  • subprocess (запуск Claude CLI)

Инфраструктура

Параметр Значение
Сервер slin@82.22.50.71 (mva154)
Рабочая директория /home/slin/repos/orchestrator/
Gitea repo admin/agent-dev (переименовать или создать admin/orchestrator)
Деплой docker compose up -d
URL https://openclaw.mva154.duckdns.org/orchestrator/
Порт контейнера 8500

Файловая карта

Действие Файл Ответственность
Создать src/main.py FastAPI app, роутеры
Создать src/webhooks/plane.py Обработка Plane webhook events
Создать src/webhooks/gitea.py Обработка Gitea webhook events
Создать src/agents/launcher.py Запуск Claude CLI агентов
Создать src/qg/checks.py Quality Gate проверки
Создать src/db.py SQLite: журнал событий, задачи
Создать src/config.py Конфигурация из env
Создать Dockerfile Python 3.12-slim + deps
Создать docker-compose.yml Сервис + volume для SQLite
Создать requirements.txt Зависимости
Создать .env.example Шаблон переменных
Создать tests/test_webhooks.py Тесты webhook-обработки
Создать README.md Документация

Задачи

Task 1: Скелет проекта + Health endpoint

Файлы:

  • Создать: src/main.py, src/config.py, src/db.py
  • Создать: requirements.txt, Dockerfile, docker-compose.yml, .env.example
  • Создать: README.md

Шаги:

  • 1.1 Создать структуру проекта
orchestrator/
├── src/
│   ├── __init__.py
│   ├── main.py          # FastAPI app
│   ├── config.py        # Settings from env
│   ├── db.py            # SQLite init + helpers
│   ├── webhooks/
│   │   ├── __init__.py
│   │   ├── plane.py
│   │   └── gitea.py
│   ├── agents/
│   │   ├── __init__.py
│   │   └── launcher.py
│   └── qg/
│       ├── __init__.py
│       └── checks.py
├── tests/
│   ├── __init__.py
│   └── test_webhooks.py
├── data/               # SQLite DB (volume mount)
├── requirements.txt
├── Dockerfile
├── docker-compose.yml
├── .env.example
└── README.md
  • 1.2 src/config.py — Pydantic Settings
from pydantic_settings import BaseSettings

class Settings(BaseSettings):
    # Plane
    plane_api_url: str = "http://localhost:8091"
    plane_api_token: str = ""
    plane_workspace_slug: str = ""
    plane_webhook_secret: str = ""
    
    # Gitea
    gitea_url: str = "http://localhost:3000"
    gitea_token: str = ""
    gitea_webhook_secret: str = ""
    
    # Claude CLI
    claude_bin: str = "/usr/bin/claude"
    repos_dir: str = "/home/slin/repos"
    
    # DB
    db_path: str = "/app/data/orchestrator.db"
    
    class Config:
        env_prefix = "ORCH_"
        env_file = ".env"

settings = Settings()
  • 1.3 src/db.py — SQLite schema
import sqlite3
from .config import settings

def get_db():
    conn = sqlite3.connect(settings.db_path)
    conn.row_factory = sqlite3.Row
    return conn

def init_db():
    conn = get_db()
    conn.executescript("""
        CREATE TABLE IF NOT EXISTS events (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            timestamp TEXT DEFAULT (datetime('now')),
            source TEXT NOT NULL,  -- 'plane' | 'gitea'
            event_type TEXT NOT NULL,
            payload TEXT NOT NULL,
            processed INTEGER DEFAULT 0
        );
        CREATE TABLE IF NOT EXISTS tasks (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            plane_id TEXT,
            repo TEXT NOT NULL,
            branch TEXT,
            stage TEXT DEFAULT 'created',
            agent_running TEXT,
            created_at TEXT DEFAULT (datetime('now')),
            updated_at TEXT DEFAULT (datetime('now'))
        );
        CREATE TABLE IF NOT EXISTS agent_runs (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            task_id INTEGER REFERENCES tasks(id),
            agent TEXT NOT NULL,
            started_at TEXT DEFAULT (datetime('now')),
            finished_at TEXT,
            exit_code INTEGER,
            output_path TEXT
        );
    """)
    conn.close()
  • 1.4 src/main.py — FastAPI app
from fastapi import FastAPI
from contextlib import asynccontextmanager
from .db import init_db
from .webhooks.plane import router as plane_router
from .webhooks.gitea import router as gitea_router

@asynccontextmanager
async def lifespan(app: FastAPI):
    init_db()
    yield

app = FastAPI(title="Multi-Agent Orchestrator", lifespan=lifespan)
app.include_router(plane_router, prefix="/webhook")
app.include_router(gitea_router, prefix="/webhook")

@app.get("/health")
async def health():
    return {"status": "ok", "service": "orchestrator"}

@app.get("/status")
async def status():
    from .db import get_db
    conn = get_db()
    tasks = conn.execute("SELECT * FROM tasks WHERE stage != 'done' ORDER BY created_at DESC LIMIT 10").fetchall()
    conn.close()
    return {"active_tasks": [dict(t) for t in tasks]}
  • 1.5 requirements.txt
fastapi==0.115.0
uvicorn[standard]==0.30.0
pydantic-settings==2.5.0
httpx==0.27.0
  • 1.6 Dockerfile
FROM python:3.12-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY src/ src/
RUN mkdir -p /app/data
CMD ["uvicorn", "src.main:app", "--host", "0.0.0.0", "--port", "8500"]
  • 1.7 docker-compose.yml
services:
  orchestrator:
    build: .
    container_name: orchestrator
    restart: unless-stopped
    ports:
      - "127.0.0.1:8500:8500"
    volumes:
      - ./data:/app/data
      - /home/slin/repos:/repos:ro
    env_file: .env
    environment:
      - ORCH_REPOS_DIR=/repos
  • 1.8 .env.example
ORCH_PLANE_API_URL=http://plane-app-api-1:8000
ORCH_PLANE_API_TOKEN=
ORCH_PLANE_WORKSPACE_SLUG=
ORCH_PLANE_WEBHOOK_SECRET=
ORCH_GITEA_URL=http://localhost:3000
ORCH_GITEA_TOKEN=
ORCH_GITEA_WEBHOOK_SECRET=
ORCH_CLAUDE_BIN=/usr/bin/claude
ORCH_REPOS_DIR=/home/slin/repos
ORCH_DB_PATH=/app/data/orchestrator.db

Критерий готовности: docker compose up -dcurl localhost:8500/health{"status": "ok"}


Task 2: Webhook handlers (Plane + Gitea)

Файлы:

  • Создать: src/webhooks/plane.py, src/webhooks/gitea.py

Шаги:

  • 2.1 src/webhooks/plane.py
from fastapi import APIRouter, Request, HTTPException
import json
from ..db import get_db
from ..config import settings

router = APIRouter()

@router.post("/plane")
async def plane_webhook(request: Request):
    """Handle Plane webhook events."""
    body = await request.body()
    payload = json.loads(body)
    
    # Log event
    conn = get_db()
    conn.execute(
        "INSERT INTO events (source, event_type, payload) VALUES (?, ?, ?)",
        ("plane", payload.get("event", "unknown"), body.decode())
    )
    conn.commit()
    
    event = payload.get("event")
    data = payload.get("data", {})
    
    if event == "work_item.created":
        await handle_work_item_created(data, conn)
    elif event == "comment.created":
        await handle_comment(data, conn)
    
    conn.close()
    return {"status": "accepted"}

async def handle_work_item_created(data: dict, conn):
    """New work item → create branch + start Analyst."""
    plane_id = data.get("id", "")
    name = data.get("name", "")
    project_id = data.get("project", "")
    
    # Create task record
    conn.execute(
        "INSERT INTO tasks (plane_id, repo, stage) VALUES (?, ?, ?)",
        (plane_id, "enduro-trails", "analysis")
    )
    conn.commit()
    
    # TODO: Create git branch
    # TODO: Launch Analyst agent

async def handle_comment(data: dict, conn):
    """Check for :approved: reaction → advance stage."""
    comment_body = data.get("comment", "")
    if ":approved:" in comment_body:
        # TODO: Determine which task, advance QG
        pass
  • 2.2 src/webhooks/gitea.py
from fastapi import APIRouter, Request
import json
from ..db import get_db

router = APIRouter()

@router.post("/gitea")
async def gitea_webhook(request: Request):
    """Handle Gitea webhook events."""
    body = await request.body()
    payload = json.loads(body)
    
    # Log event
    conn = get_db()
    event_type = request.headers.get("X-Gitea-Event", "unknown")
    conn.execute(
        "INSERT INTO events (source, event_type, payload) VALUES (?, ?, ?)",
        ("gitea", event_type, body.decode())
    )
    conn.commit()
    
    if event_type == "push":
        await handle_push(payload, conn)
    elif event_type == "pull_request":
        await handle_pr(payload, conn)
    elif event_type == "status":
        await handle_ci_status(payload, conn)
    
    conn.close()
    return {"status": "accepted"}

async def handle_push(payload: dict, conn):
    """Push event — check if CI should trigger next stage."""
    ref = payload.get("ref", "")
    repo = payload.get("repository", {}).get("name", "")
    # Log for now
    pass

async def handle_pr(payload: dict, conn):
    """PR event — check reviews, CI status."""
    action = payload.get("action", "")
    pr = payload.get("pull_request", {})
    
    if action == "reviewed" and pr.get("state") == "approved":
        # TODO: QG-5 check → launch Tester
        pass

async def handle_ci_status(payload: dict, conn):
    """CI status update — check if all green → advance."""
    state = payload.get("state", "")
    context = payload.get("context", "")
    sha = payload.get("sha", "")
    
    if state == "success":
        # TODO: Check all required contexts green → advance stage
        pass

Критерий готовности: POST to /webhook/plane и /webhook/gitea → 200, events записываются в SQLite


Task 3: Agent Launcher

Файлы:

  • Создать: src/agents/launcher.py

Шаги:

  • 3.1 src/agents/launcher.py
import subprocess
import os
import json
from datetime import datetime
from ..config import settings
from ..db import get_db

class AgentLauncher:
    """Launch Claude CLI agents for specific tasks."""
    
    AGENT_CONFIGS = {
        "analyst": {
            "system_prompt": ".openclaw/agents/analyst.md",
            "task_file": ".task.md",
            "allowed_tools": "Read,Write,Edit,Bash",
        },
        "architect": {
            "system_prompt": ".openclaw/agents/architect.md",
            "task_file": ".task-arch.md",
            "allowed_tools": "Read,Write,Edit,Bash",
        },
        "developer": {
            "system_prompt": ".openclaw/agents/developer.md",
            "task_file": ".task-dev.md",
            "allowed_tools": "Read,Write,Edit,Bash",
        },
        "reviewer": {
            "system_prompt": ".openclaw/agents/reviewer.md",
            "task_file": ".task-review.md",
            "allowed_tools": "Read,Write,Edit,Bash",
        },
        "tester": {
            "system_prompt": ".openclaw/agents/tester.md",
            "task_file": ".task-test.md",
            "allowed_tools": "Read,Write,Edit,Bash",
        },
    }
    
    def launch(self, agent: str, repo: str, task_content: str = None) -> int:
        """
        Launch a Claude CLI agent.
        
        Args:
            agent: Agent role (analyst, architect, developer, reviewer, tester)
            repo: Repository name (e.g., 'enduro-trails')
            task_content: Optional task content (if not using .task file)
        
        Returns:
            agent_run_id from DB
        """
        config = self.AGENT_CONFIGS.get(agent)
        if not config:
            raise ValueError(f"Unknown agent: {agent}")
        
        repo_path = os.path.join(settings.repos_dir, repo)
        if not os.path.isdir(repo_path):
            raise FileNotFoundError(f"Repo not found: {repo_path}")
        
        # Write task file if content provided
        if task_content:
            task_path = os.path.join(repo_path, config["task_file"])
            with open(task_path, "w") as f:
                f.write(task_content)
        
        # Record run in DB
        conn = get_db()
        cursor = conn.execute(
            "INSERT INTO agent_runs (task_id, agent) VALUES (NULL, ?)",
            (agent,)
        )
        run_id = cursor.lastrowid
        conn.commit()
        
        # Build command
        system_prompt_path = os.path.join(repo_path, config["system_prompt"])
        task_file_path = os.path.join(repo_path, config["task_file"])
        
        output_path = f"/app/data/runs/{run_id}.log"
        os.makedirs(os.path.dirname(output_path), exist_ok=True)
        
        cmd = [
            settings.claude_bin,
            "--print",
            f"$(cat {task_file_path})",
            "--system-prompt", f"$(cat {system_prompt_path})",
            "--allowedTools", config["allowed_tools"],
        ]
        
        # Launch as background process
        with open(output_path, "w") as log_file:
            process = subprocess.Popen(
                ["bash", "-c", f'cd {repo_path} && {settings.claude_bin} --print "$(cat {config["task_file"]})" --system-prompt "$(cat {config["system_prompt"]})" --allowedTools {config["allowed_tools"]}'],
                stdout=log_file,
                stderr=subprocess.STDOUT,
                cwd=repo_path,
            )
        
        # Update DB with PID
        conn.execute(
            "UPDATE agent_runs SET output_path = ? WHERE id = ?",
            (output_path, run_id)
        )
        conn.commit()
        conn.close()
        
        return run_id

launcher = AgentLauncher()

Критерий готовности: launcher.launch("analyst", "enduro-trails", "...") запускает Claude CLI процесс, записывает в DB


Task 4: Тесты + README

Файлы:

  • Создать: tests/test_webhooks.py, README.md

Шаги:

  • 4.1 tests/test_webhooks.py
import pytest
from fastapi.testclient import TestClient
from src.main import app

client = TestClient(app)

def test_health():
    resp = client.get("/health")
    assert resp.status_code == 200
    assert resp.json()["status"] == "ok"

def test_plane_webhook_accepts():
    resp = client.post("/webhook/plane", json={
        "event": "work_item.created",
        "data": {"id": "test-123", "name": "Test task", "project": "proj-1"}
    })
    assert resp.status_code == 200
    assert resp.json()["status"] == "accepted"

def test_gitea_webhook_accepts():
    resp = client.post(
        "/webhook/gitea",
        json={"ref": "refs/heads/feature/test", "repository": {"name": "enduro-trails"}},
        headers={"X-Gitea-Event": "push"}
    )
    assert resp.status_code == 200
    assert resp.json()["status"] == "accepted"

def test_status_endpoint():
    resp = client.get("/status")
    assert resp.status_code == 200
    assert "active_tasks" in resp.json()
  • 4.2 README.md с описанием API, настройки, деплоя

Критерий готовности: pytest tests/ → all green


Task 5: Деплой на mva154

Шаги:

  • 5.1 Инициализировать git-репо, push в Gitea admin/orchestrator (или использовать admin/agent-dev)
cd /home/slin/repos/orchestrator
git init
git add .
git commit -m "feat: orchestrator MVP — webhooks, agent launcher, QG checks"
git remote add origin http://localhost:3000/admin/agent-dev.git
git push -u origin main
  • 5.2 Создать .env из .env.example, заполнить токены

  • 5.3 docker compose up -d --build

  • 5.4 Добавить Nginx location

location /orchestrator/ {
    proxy_pass http://127.0.0.1:8500/;
    proxy_set_header Host $host;
    proxy_set_header X-Real-IP $remote_addr;
}
  • 5.5 Настроить webhook в Gitea: Settings → Webhooks → http://localhost:8500/webhook/gitea

  • 5.6 Проверить

curl -s https://openclaw.mva154.duckdns.org/orchestrator/health
# {"status": "ok", "service": "orchestrator"}

Критерий готовности: Orchestrator доступен по URL, принимает webhooks, записывает в DB


Проверка (Acceptance)

# Проверка Команда / Действие Ожидаемый результат
1 Health endpoint curl .../orchestrator/health {"status": "ok"}
2 Plane webhook POST JSON → /webhook/plane 200, event в SQLite
3 Gitea webhook Push в feature-ветку Event в SQLite
4 Status endpoint curl .../orchestrator/status JSON с active_tasks
5 Agent launch POST test task Claude CLI запускается
6 Tests pass pytest tests/ All green

Ограничения и контекст

  • ⚠️ Claude CLI на mva154 — /usr/bin/claude, auth через claude.ai (Max subscription)
  • ⚠️ Gitea доступен по http://localhost:3000 (не по внешнему домену — DNS нестабилен)
  • ⚠️ Plane API — порт 8091 (proxy) или напрямую к API-контейнеру 172.21.0.6:8000
  • ⚠️ Repos лежат в /home/slin/repos/ — монтировать как volume read-only
  • ⚠️ Orchestrator НЕ должен сам мержить PR или деплоить — только запускать агентов
  • 🚫 НЕ использовать Docker-in-Docker для runner'а
  • 🚫 НЕ хардкодить токены — только через .env

Деплой-чеклист

  • Код написан и тесты проходят
  • Docker image собирается
  • .env заполнен
  • docker compose up -d — контейнер running
  • Nginx location добавлен, nginx -t && systemctl reload nginx
  • Health endpoint отвечает
  • Gitea webhook настроен и доставляется
  • Нет ошибок в docker logs orchestrator --tail 50

Создано: 2026-05-19 | Автор ТЗ: Стрим | Исполнитель: Dev-агент