commit daf8cdad9e4d4763ad13c8bbc7b4c748be55f7df Author: Dev Agent Date: Tue May 19 15:57:00 2026 +0300 feat: orchestrator MVP — webhooks, agent launcher, QG checks diff --git a/.env b/.env new file mode 100644 index 0000000..57481e8 --- /dev/null +++ b/.env @@ -0,0 +1,10 @@ +ORCH_PLANE_API_URL=http://plane-app-api-1:8000 +ORCH_PLANE_API_TOKEN= +ORCH_PLANE_WORKSPACE_SLUG= +ORCH_PLANE_WEBHOOK_SECRET= +ORCH_GITEA_URL=http://localhost:3000 +ORCH_GITEA_TOKEN=c81227b0dee2217f9ab3d28c3642a4578a1b9772 +ORCH_GITEA_WEBHOOK_SECRET= +ORCH_CLAUDE_BIN=/usr/bin/claude +ORCH_REPOS_DIR=/home/slin/repos +ORCH_DB_PATH=/app/data/orchestrator.db diff --git a/.env.example b/.env.example new file mode 100644 index 0000000..ffdb5cc --- /dev/null +++ b/.env.example @@ -0,0 +1,10 @@ +ORCH_PLANE_API_URL=http://plane-app-api-1:8000 +ORCH_PLANE_API_TOKEN= +ORCH_PLANE_WORKSPACE_SLUG= +ORCH_PLANE_WEBHOOK_SECRET= +ORCH_GITEA_URL=http://localhost:3000 +ORCH_GITEA_TOKEN= +ORCH_GITEA_WEBHOOK_SECRET= +ORCH_CLAUDE_BIN=/usr/bin/claude +ORCH_REPOS_DIR=/home/slin/repos +ORCH_DB_PATH=/app/data/orchestrator.db diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..0b02d3a --- /dev/null +++ b/Dockerfile @@ -0,0 +1,7 @@ +FROM python:3.12-slim +WORKDIR /app +COPY requirements.txt . +RUN pip install --no-cache-dir -r requirements.txt +COPY src/ src/ +RUN mkdir -p /app/data/runs +CMD ["uvicorn", "src.main:app", "--host", "0.0.0.0", "--port", "8500"] diff --git a/README.md b/README.md new file mode 100644 index 0000000..9700b39 --- /dev/null +++ b/README.md @@ -0,0 +1,70 @@ +# Multi-Agent Orchestrator + +FastAPI-сервис для оркестрации мульти-агентного пайплайна разработки. + +## Что делает + +- Принимает webhooks от **Plane** (task management) и **Gitea** (git events) +- Проверяет Quality Gates перед переходом между стадиями +- Запускает **Claude CLI** агентов (analyst, architect, developer, reviewer, tester) +- Ведёт журнал событий в SQLite + +## API Endpoints + +| Method | Path | Описание | +|--------|------|----------| +| GET | `/health` | Health check | +| GET | `/status` | Активные задачи | +| POST | `/webhook/plane` | Plane webhook receiver | +| POST | `/webhook/gitea` | Gitea webhook receiver | + +## Настройка + +```bash +cp .env.example .env +# Заполнить токены в .env +``` + +## Запуск (Docker) + +```bash +docker compose up -d --build +``` + +## Запуск (dev) + +```bash +pip install -r requirements.txt +uvicorn src.main:app --reload --port 8500 +``` + +## Тесты + +```bash +pip install pytest +pytest tests/ -v +``` + +## Переменные окружения + +| Переменная | Описание | Default | +|-----------|----------|---------| +| `ORCH_PLANE_API_URL` | Plane API URL | `http://localhost:8091` | +| `ORCH_PLANE_API_TOKEN` | Plane API token | — | +| `ORCH_PLANE_WEBHOOK_SECRET` | Webhook secret для верификации | — | +| `ORCH_GITEA_URL` | Gitea URL | `http://localhost:3000` | +| `ORCH_GITEA_TOKEN` | Gitea API token | — | +| `ORCH_GITEA_WEBHOOK_SECRET` | Gitea webhook secret | — | +| `ORCH_CLAUDE_BIN` | Путь к Claude CLI | `/usr/bin/claude` | +| `ORCH_REPOS_DIR` | Директория с репозиториями | `/home/slin/repos` | +| `ORCH_DB_PATH` | Путь к SQLite БД | `/app/data/orchestrator.db` | + +## Архитектура + +``` +Plane webhook ──┐ + ├──► Orchestrator ──► Quality Gates ──► Agent Launcher ──► Claude CLI +Gitea webhook ──┘ │ + ▼ + SQLite (events, tasks, agent_runs) +``` diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 0000000..7a88c68 --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,13 @@ +services: + orchestrator: + build: . + container_name: orchestrator + restart: unless-stopped + ports: + - "127.0.0.1:8500:8500" + volumes: + - ./data:/app/data + - /home/slin/repos:/repos:ro + env_file: .env + environment: + - ORCH_REPOS_DIR=/repos diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..92ef7b6 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,4 @@ +fastapi==0.115.0 +uvicorn[standard]==0.30.0 +pydantic-settings==2.5.0 +httpx==0.27.0 diff --git a/src/__init__.py b/src/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/agents/__init__.py b/src/agents/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/agents/launcher.py b/src/agents/launcher.py new file mode 100644 index 0000000..58a47a0 --- /dev/null +++ b/src/agents/launcher.py @@ -0,0 +1,105 @@ +import subprocess +import os +from ..config import settings +from ..db import get_db + + +class AgentLauncher: + """Launch Claude CLI agents for specific tasks.""" + + AGENT_CONFIGS = { + "analyst": { + "system_prompt": ".openclaw/agents/analyst.md", + "task_file": ".task.md", + "allowed_tools": "Read,Write,Edit,Bash", + }, + "architect": { + "system_prompt": ".openclaw/agents/architect.md", + "task_file": ".task-arch.md", + "allowed_tools": "Read,Write,Edit,Bash", + }, + "developer": { + "system_prompt": ".openclaw/agents/developer.md", + "task_file": ".task-dev.md", + "allowed_tools": "Read,Write,Edit,Bash", + }, + "reviewer": { + "system_prompt": ".openclaw/agents/reviewer.md", + "task_file": ".task-review.md", + "allowed_tools": "Read,Write,Edit,Bash", + }, + "tester": { + "system_prompt": ".openclaw/agents/tester.md", + "task_file": ".task-test.md", + "allowed_tools": "Read,Write,Edit,Bash", + }, + } + + def launch(self, agent: str, repo: str, task_content: str = None) -> int: + """ + Launch a Claude CLI agent. + + Args: + agent: Agent role (analyst, architect, developer, reviewer, tester) + repo: Repository name + task_content: Optional task content to write to task file + + Returns: + agent_run_id from DB + """ + config = self.AGENT_CONFIGS.get(agent) + if not config: + raise ValueError(f"Unknown agent: {agent}") + + repo_path = os.path.join(settings.repos_dir, repo) + if not os.path.isdir(repo_path): + raise FileNotFoundError(f"Repo not found: {repo_path}") + + # Write task file if content provided + if task_content: + task_path = os.path.join(repo_path, config["task_file"]) + with open(task_path, "w") as f: + f.write(task_content) + + # Record run in DB + conn = get_db() + cursor = conn.execute( + "INSERT INTO agent_runs (task_id, agent) VALUES (NULL, ?)", + (agent,), + ) + run_id = cursor.lastrowid + conn.commit() + + # Prepare output log + output_path = f"/app/data/runs/{run_id}.log" + os.makedirs(os.path.dirname(output_path), exist_ok=True) + + # Build shell command + cmd = ( + f'cd {repo_path} && {settings.claude_bin} --print ' + f'"$(cat {config["task_file"]})" ' + f'--system-prompt "$(cat {config["system_prompt"]})" ' + f'--allowedTools {config["allowed_tools"]}' + ) + + # Launch as background process + with open(output_path, "w") as log_file: + subprocess.Popen( + ["bash", "-c", cmd], + stdout=log_file, + stderr=subprocess.STDOUT, + cwd=repo_path, + ) + + # Update DB with output path + conn.execute( + "UPDATE agent_runs SET output_path = ? WHERE id = ?", + (output_path, run_id), + ) + conn.commit() + conn.close() + + return run_id + + +launcher = AgentLauncher() diff --git a/src/config.py b/src/config.py new file mode 100644 index 0000000..72f345d --- /dev/null +++ b/src/config.py @@ -0,0 +1,28 @@ +from pydantic_settings import BaseSettings + + +class Settings(BaseSettings): + # Plane + plane_api_url: str = "http://localhost:8091" + plane_api_token: str = "" + plane_workspace_slug: str = "" + plane_webhook_secret: str = "" + + # Gitea + gitea_url: str = "http://localhost:3000" + gitea_token: str = "" + gitea_webhook_secret: str = "" + + # Claude CLI + claude_bin: str = "/usr/bin/claude" + repos_dir: str = "/home/slin/repos" + + # DB + db_path: str = "/app/data/orchestrator.db" + + class Config: + env_prefix = "ORCH_" + env_file = ".env" + + +settings = Settings() diff --git a/src/db.py b/src/db.py new file mode 100644 index 0000000..904500a --- /dev/null +++ b/src/db.py @@ -0,0 +1,42 @@ +import sqlite3 +from .config import settings + + +def get_db() -> sqlite3.Connection: + conn = sqlite3.connect(settings.db_path) + conn.row_factory = sqlite3.Row + return conn + + +def init_db(): + conn = get_db() + conn.executescript(""" + CREATE TABLE IF NOT EXISTS events ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + timestamp TEXT DEFAULT (datetime('now')), + source TEXT NOT NULL, + event_type TEXT NOT NULL, + payload TEXT NOT NULL, + processed INTEGER DEFAULT 0 + ); + CREATE TABLE IF NOT EXISTS tasks ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + plane_id TEXT, + repo TEXT NOT NULL, + branch TEXT, + stage TEXT DEFAULT 'created', + agent_running TEXT, + created_at TEXT DEFAULT (datetime('now')), + updated_at TEXT DEFAULT (datetime('now')) + ); + CREATE TABLE IF NOT EXISTS agent_runs ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + task_id INTEGER REFERENCES tasks(id), + agent TEXT NOT NULL, + started_at TEXT DEFAULT (datetime('now')), + finished_at TEXT, + exit_code INTEGER, + output_path TEXT + ); + """) + conn.close() diff --git a/src/main.py b/src/main.py new file mode 100644 index 0000000..6e60511 --- /dev/null +++ b/src/main.py @@ -0,0 +1,32 @@ +from fastapi import FastAPI +from contextlib import asynccontextmanager +from .db import init_db +from .webhooks.plane import router as plane_router +from .webhooks.gitea import router as gitea_router + + +@asynccontextmanager +async def lifespan(app: FastAPI): + init_db() + yield + + +app = FastAPI(title="Multi-Agent Orchestrator", lifespan=lifespan) +app.include_router(plane_router, prefix="/webhook") +app.include_router(gitea_router, prefix="/webhook") + + +@app.get("/health") +async def health(): + return {"status": "ok", "service": "orchestrator"} + + +@app.get("/status") +async def status(): + from .db import get_db + conn = get_db() + tasks = conn.execute( + "SELECT * FROM tasks WHERE stage != 'done' ORDER BY created_at DESC LIMIT 10" + ).fetchall() + conn.close() + return {"active_tasks": [dict(t) for t in tasks]} diff --git a/src/qg/__init__.py b/src/qg/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/qg/checks.py b/src/qg/checks.py new file mode 100644 index 0000000..3fe7fa8 --- /dev/null +++ b/src/qg/checks.py @@ -0,0 +1,26 @@ +# Quality Gate checks placeholder +# Will be expanded as pipeline matures + + +def check_analysis_complete(task_id: int) -> bool: + """Check if analysis artifacts exist.""" + # TODO: verify .task-arch.md exists in repo + return True + + +def check_architecture_approved(task_id: int) -> bool: + """Check if architecture was approved in Plane.""" + # TODO: check Plane comment for :approved: + return False + + +def check_ci_green(repo: str, branch: str) -> bool: + """Check if CI status is green for branch.""" + # TODO: query Gitea commit status API + return False + + +def check_review_approved(repo: str, pr_number: int) -> bool: + """Check if PR has approved review.""" + # TODO: query Gitea PR reviews API + return False diff --git a/src/webhooks/__init__.py b/src/webhooks/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/webhooks/gitea.py b/src/webhooks/gitea.py new file mode 100644 index 0000000..7a02c66 --- /dev/null +++ b/src/webhooks/gitea.py @@ -0,0 +1,54 @@ +from fastapi import APIRouter, Request +import json +from ..db import get_db + +router = APIRouter() + + +@router.post("/gitea") +async def gitea_webhook(request: Request): + """Handle Gitea webhook events.""" + body = await request.body() + payload = json.loads(body) + + # Log event + conn = get_db() + event_type = request.headers.get("X-Gitea-Event", "unknown") + conn.execute( + "INSERT INTO events (source, event_type, payload) VALUES (?, ?, ?)", + ("gitea", event_type, body.decode()), + ) + conn.commit() + + if event_type == "push": + await handle_push(payload, conn) + elif event_type == "pull_request": + await handle_pr(payload, conn) + elif event_type == "status": + await handle_ci_status(payload, conn) + + conn.close() + return {"status": "accepted"} + + +async def handle_push(payload: dict, conn): + """Push event — log for now.""" + pass + + +async def handle_pr(payload: dict, conn): + """PR event — check reviews, CI status.""" + action = payload.get("action", "") + pr = payload.get("pull_request", {}) + + if action == "reviewed" and pr.get("state") == "approved": + # TODO: QG-5 check -> launch Tester + pass + + +async def handle_ci_status(payload: dict, conn): + """CI status update — check if all green -> advance.""" + state = payload.get("state", "") + if state == "success": + # TODO: Check all required contexts green -> advance stage + pass diff --git a/src/webhooks/plane.py b/src/webhooks/plane.py new file mode 100644 index 0000000..adabd62 --- /dev/null +++ b/src/webhooks/plane.py @@ -0,0 +1,49 @@ +from fastapi import APIRouter, Request +import json +from ..db import get_db + +router = APIRouter() + + +@router.post("/plane") +async def plane_webhook(request: Request): + """Handle Plane webhook events.""" + body = await request.body() + payload = json.loads(body) + + # Log event + conn = get_db() + conn.execute( + "INSERT INTO events (source, event_type, payload) VALUES (?, ?, ?)", + ("plane", payload.get("event", "unknown"), body.decode()), + ) + conn.commit() + + event = payload.get("event") + data = payload.get("data", {}) + + if event == "work_item.created": + await handle_work_item_created(data, conn) + elif event == "comment.created": + await handle_comment(data, conn) + + conn.close() + return {"status": "accepted"} + + +async def handle_work_item_created(data: dict, conn): + """New work item -> create task record.""" + plane_id = data.get("id", "") + conn.execute( + "INSERT INTO tasks (plane_id, repo, stage) VALUES (?, ?, ?)", + (plane_id, "enduro-trails", "analysis"), + ) + conn.commit() + + +async def handle_comment(data: dict, conn): + """Check for :approved: reaction -> advance stage.""" + comment_body = data.get("comment", "") + if ":approved:" in comment_body: + # TODO: Determine which task, advance QG + pass diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/test_webhooks.py b/tests/test_webhooks.py new file mode 100644 index 0000000..0566f6a --- /dev/null +++ b/tests/test_webhooks.py @@ -0,0 +1,65 @@ +import pytest +from fastapi.testclient import TestClient +import os +import tempfile + +# Override DB path before importing app +os.environ["ORCH_DB_PATH"] = os.path.join(tempfile.gettempdir(), "test_orchestrator.db") + +from src.main import app + +client = TestClient(app) + + +def test_health(): + resp = client.get("/health") + assert resp.status_code == 200 + assert resp.json()["status"] == "ok" + assert resp.json()["service"] == "orchestrator" + + +def test_plane_webhook_accepts(): + resp = client.post("/webhook/plane", json={ + "event": "work_item.created", + "data": {"id": "test-123", "name": "Test task", "project": "proj-1"} + }) + assert resp.status_code == 200 + assert resp.json()["status"] == "accepted" + + +def test_plane_webhook_comment(): + resp = client.post("/webhook/plane", json={ + "event": "comment.created", + "data": {"comment": "LGTM :approved:"} + }) + assert resp.status_code == 200 + assert resp.json()["status"] == "accepted" + + +def test_gitea_webhook_push(): + resp = client.post( + "/webhook/gitea", + json={"ref": "refs/heads/feature/test", "repository": {"name": "enduro-trails"}}, + headers={"X-Gitea-Event": "push"} + ) + assert resp.status_code == 200 + assert resp.json()["status"] == "accepted" + + +def test_gitea_webhook_pr(): + resp = client.post( + "/webhook/gitea", + json={ + "action": "reviewed", + "pull_request": {"state": "approved", "number": 1} + }, + headers={"X-Gitea-Event": "pull_request"} + ) + assert resp.status_code == 200 + assert resp.json()["status"] == "accepted" + + +def test_status_endpoint(): + resp = client.get("/status") + assert resp.status_code == 200 + assert "active_tasks" in resp.json()