From b545665e2d3471deaed3367d789065eae2c47b1d Mon Sep 17 00:00:00 2001 From: Dev Agent Date: Fri, 22 May 2026 01:57:02 +0300 Subject: [PATCH] feat: full pipeline fixes - CI status branch lookup, review webhook routing, auto-advance, plane sync - handle_ci_status: fallback git branch -r --contains when branches[] empty - webhook router: handle pull_request_approved event type - handle_pr: map review.type to review.state for new Gitea format - launcher: auto-advance stage after agent completion (_try_advance_stage) - plane_sync: notify Plane on stage changes - stages.py: stage machine with QG definitions - notifications.py: stage change notifications - safe.directory fix for container git operations --- Dockerfile | 11 ++ docker-compose.yml | 13 +- docs/SETUP_WEBHOOKS.md | 163 ++++++++++++++++++++++++ requirements.txt | 1 + src/agents/launcher.py | 252 ++++++++++++++++++++++++++++++++++--- src/config.py | 8 +- src/db.py | 56 +++++++++ src/main.py | 7 ++ src/notifications.py | 28 +++++ src/plane_sync.py | 129 +++++++++++++++++++ src/qg/checks.py | 157 ++++++++++++++++++++--- src/stages.py | 45 +++++++ src/webhooks/gitea.py | 277 +++++++++++++++++++++++++++++++++++++---- src/webhooks/plane.py | 260 +++++++++++++++++++++++++++++++++++--- tests/test_qg.py | 188 ++++++++++++++++++++++++++++ tests/test_webhooks.py | 236 +++++++++++++++++++++++++++++++---- 16 files changed, 1729 insertions(+), 102 deletions(-) create mode 100644 docs/SETUP_WEBHOOKS.md create mode 100644 src/notifications.py create mode 100644 src/plane_sync.py create mode 100644 src/stages.py create mode 100644 tests/test_qg.py diff --git a/Dockerfile b/Dockerfile index 0b02d3a..3544f75 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,4 +1,15 @@ FROM python:3.12-slim + +# Install Docker CLI for sibling container launches +RUN apt-get update && \ + apt-get install -y --no-install-recommends ca-certificates curl gnupg git && \ + install -m 0755 -d /etc/apt/keyrings && \ + curl -fsSL https://download.docker.com/linux/debian/gpg | gpg --dearmor -o /etc/apt/keyrings/docker.gpg && \ + echo "deb [arch=$(dpkg --print-architecture) signed-by=/etc/apt/keyrings/docker.gpg] https://download.docker.com/linux/debian bookworm stable" > /etc/apt/sources.list.d/docker.list && \ + apt-get update && \ + apt-get install -y --no-install-recommends docker-ce-cli && \ + rm -rf /var/lib/apt/lists/* + WORKDIR /app COPY requirements.txt . RUN pip install --no-cache-dir -r requirements.txt diff --git a/docker-compose.yml b/docker-compose.yml index 7a88c68..7a8a154 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -3,11 +3,18 @@ services: build: . container_name: orchestrator restart: unless-stopped - ports: - - "127.0.0.1:8500:8500" + network_mode: host volumes: - ./data:/app/data - - /home/slin/repos:/repos:ro + - /home/slin/repos:/repos + - /var/run/docker.sock:/var/run/docker.sock + - /usr/lib/node_modules/@anthropic-ai/claude-code:/opt/claude-code:ro + - /usr/bin/node:/usr/bin/node:ro + - /home/slin/.claude:/home/slin/.claude + - /home/slin/.claude.json:/home/slin/.claude.json:ro env_file: .env environment: - ORCH_REPOS_DIR=/repos + - ORCH_HOST_REPOS_DIR=/home/slin/repos + group_add: + - "999" diff --git a/docs/SETUP_WEBHOOKS.md b/docs/SETUP_WEBHOOKS.md new file mode 100644 index 0000000..470396d --- /dev/null +++ b/docs/SETUP_WEBHOOKS.md @@ -0,0 +1,163 @@ +# Webhook Setup: Plane + Gitea → Orchestrator + +## Архитектура + +``` +Gitea (push/PR/CI) ──→ Nginx proxy ──→ Orchestrator /webhook/gitea +Plane (work_item/comment) ──→ Nginx proxy ──→ Orchestrator /webhook/plane +``` + +External URL: `https://openclaw.mva154.duckdns.org/orchestrator/` +Internal URL: `http://127.0.0.1:8500/` + +--- + +## Gitea Webhook + +**Создан автоматически через API.** + +- URL: `https://openclaw.mva154.duckdns.org/orchestrator/webhook/gitea` +- Events: `push`, `pull_request`, `status` +- Secret: значение `ORCH_GITEA_WEBHOOK_SECRET` в `.env` +- Signature header: `X-Gitea-Signature` (HMAC-SHA256 hex digest) + +### Проверка + +```bash +GITEA_TOKEN=$(grep ORCH_GITEA_TOKEN /home/slin/repos/orchestrator/.env | cut -d= -f2) +curl -s "http://localhost:3000/api/v1/repos/admin/enduro-trails/hooks" \ + -H "Authorization: token ${GITEA_TOKEN}" | python3 -m json.tool +``` + +### Пересоздание (если нужно) + +```bash +GITEA_WEBHOOK_SECRET=$(openssl rand -hex 20) +# Обновить в .env: ORCH_GITEA_WEBHOOK_SECRET= + +curl -X POST "http://localhost:3000/api/v1/repos/admin/enduro-trails/hooks" \ + -H "Authorization: token ${GITEA_TOKEN}" \ + -H "Content-Type: application/json" \ + -d '{ + "type": "gitea", + "active": true, + "config": { + "url": "https://openclaw.mva154.duckdns.org/orchestrator/webhook/gitea", + "content_type": "json", + "secret": "'${GITEA_WEBHOOK_SECRET}'" + }, + "events": ["push", "pull_request", "status"], + "branch_filter": "*" + }' +``` + +--- + +## Plane Webhook + +**Создан напрямую в PostgreSQL** (Plane CE не экспортирует webhook API через внешний /api/v1/). + +- URL: `https://openclaw.mva154.duckdns.org/orchestrator/webhook/plane` +- Events: `issue` (work_item.created), `issue_comment` (comment.created) +- Secret: значение `ORCH_PLANE_WEBHOOK_SECRET` в `.env` +- Signature header: `X-Plane-Signature` (HMAC-SHA256 hex digest) + +### Проверка + +```bash +docker exec -e PGPASSWORD=plane plane-app-plane-db-1 psql -U plane -d plane -c \ + "SELECT id, url, is_active FROM webhooks;" +``` + +### Ручная настройка через UI (альтернатива) + +1. Открыть `https://plane.mva154.duckdns.org` +2. Workspace Settings → Webhooks → Add Webhook +3. URL: `https://openclaw.mva154.duckdns.org/orchestrator/webhook/plane` +4. Secret: значение из `ORCH_PLANE_WEBHOOK_SECRET` в `.env` +5. Events: Issue, Issue Comment +6. Save + +### Пересоздание через SQL + +```bash +PLANE_WEBHOOK_SECRET=$(openssl rand -hex 20) +# Обновить в .env: ORCH_PLANE_WEBHOOK_SECRET= + +WORKSPACE_ID=$(docker exec -e PGPASSWORD=plane plane-app-plane-db-1 psql -U plane -d plane -t -A -c \ + "SELECT id FROM workspaces WHERE slug='ag_proj'") + +WEBHOOK_ID=$(cat /proc/sys/kernel/random/uuid) + +docker exec -e PGPASSWORD=plane plane-app-plane-db-1 psql -U plane -d plane -c " +INSERT INTO webhooks (id, created_at, updated_at, deleted_at, workspace_id, url, is_active, secret_key, project, issue, module, cycle, issue_comment, is_internal, version) +VALUES ('${WEBHOOK_ID}', NOW(), NOW(), NULL, '${WORKSPACE_ID}', + 'https://openclaw.mva154.duckdns.org/orchestrator/webhook/plane', + true, '${PLANE_WEBHOOK_SECRET}', true, true, false, false, true, false, 'v1'); +" +``` + +--- + +## HMAC Signature Verification + +Оба handler'а проверяют подпись: +- Если secret пустой в `.env` — верификация пропускается (для dev/debug) +- Если secret задан — запрос без валидной подписи получает `401 Unauthorized` + +### Формат подписи + +| Source | Header | Algorithm | Format | +|--------|--------|-----------|--------| +| Gitea | `X-Gitea-Signature` | HMAC-SHA256 | hex digest (без префикса) | +| Plane | `X-Plane-Signature` | HMAC-SHA256 | hex digest | + +### Тест подписи вручную + +```bash +SECRET=$(grep ORCH_GITEA_WEBHOOK_SECRET /home/slin/repos/orchestrator/.env | cut -d= -f2) +BODY='{"ref":"refs/heads/test","repository":{"name":"enduro-trails"},"commits":[]}' +SIG=$(echo -n "${BODY}" | openssl dgst -sha256 -hmac "${SECRET}" | awk '{print $NF}') + +curl -X POST http://localhost:8500/webhook/gitea \ + -H "Content-Type: application/json" \ + -H "X-Gitea-Event: push" \ + -H "X-Gitea-Signature: ${SIG}" \ + -d "${BODY}" +# Expected: {"status":"accepted"} +``` + +--- + +## Переменные окружения (.env) + +| Переменная | Описание | +|-----------|----------| +| `ORCH_GITEA_WEBHOOK_SECRET` | HMAC secret для Gitea webhook | +| `ORCH_PLANE_WEBHOOK_SECRET` | HMAC secret для Plane webhook | +| `ORCH_GITEA_TOKEN` | API token для Gitea | +| `ORCH_PLANE_API_TOKEN` | API token для Plane | + +--- + +## Troubleshooting + +```bash +# Логи Orchestrator +docker logs orchestrator --tail 50 2>&1 | grep -i "webhook\|signature\|401" + +# События в БД +docker exec orchestrator python3 -c " +import sqlite3 +conn = sqlite3.connect('/app/data/orchestrator.db') +for r in conn.execute('SELECT id, source, event_type, timestamp FROM events ORDER BY id DESC LIMIT 10').fetchall(): + print(r) +" + +# Gitea webhook delivery history +# Gitea UI → Settings → Webhooks → click webhook → Recent Deliveries +``` + +--- + +*Создано: 2026-05-21 | Автор: Dev-агент* diff --git a/requirements.txt b/requirements.txt index 92ef7b6..4025b1c 100644 --- a/requirements.txt +++ b/requirements.txt @@ -2,3 +2,4 @@ fastapi==0.115.0 uvicorn[standard]==0.30.0 pydantic-settings==2.5.0 httpx==0.27.0 +pytest==8.3.3 diff --git a/src/agents/launcher.py b/src/agents/launcher.py index 58a47a0..b589bf5 100644 --- a/src/agents/launcher.py +++ b/src/agents/launcher.py @@ -1,11 +1,20 @@ import subprocess import os +import logging +import threading +import signal from ..config import settings -from ..db import get_db +from ..db import get_db, get_task_by_repo_branch, update_task_stage +from ..stages import get_next_stage, get_qg_for_stage, get_agent_for_stage +from ..qg.checks import QG_CHECKS +from ..notifications import notify_stage_change, notify_qg_failure +from ..plane_sync import notify_stage_change as plane_notify_stage + +logger = logging.getLogger("orchestrator.launcher") class AgentLauncher: - """Launch Claude CLI agents for specific tasks.""" + """Launch Claude CLI agents directly (binary mounted into container).""" AGENT_CONFIGS = { "analyst": { @@ -35,7 +44,10 @@ class AgentLauncher: }, } - def launch(self, agent: str, repo: str, task_content: str = None) -> int: + CLAUDE_BIN = "/opt/claude-code/bin/claude.exe" + AGENT_TIMEOUT = 1800 # 30 minutes + + def launch(self, agent: str, repo: str, task_content: str = None, task_id: int = None) -> int: """ Launch a Claude CLI agent. @@ -43,6 +55,7 @@ class AgentLauncher: agent: Agent role (analyst, architect, developer, reviewer, tester) repo: Repository name task_content: Optional task content to write to task file + task_id: Optional task ID to associate with this run Returns: agent_run_id from DB @@ -51,44 +64,60 @@ class AgentLauncher: if not config: raise ValueError(f"Unknown agent: {agent}") - repo_path = os.path.join(settings.repos_dir, repo) - if not os.path.isdir(repo_path): - raise FileNotFoundError(f"Repo not found: {repo_path}") + # Container-local path (repos mounted at /repos) + local_repo_path = os.path.join(settings.repos_dir, repo) + # Host path (for docker run write operations) + host_repo_path = os.path.join(settings.host_repos_dir, repo) + + if not os.path.isdir(local_repo_path): + raise FileNotFoundError(f"Repo not found: {local_repo_path}") # Write task file if content provided if task_content: - task_path = os.path.join(repo_path, config["task_file"]) - with open(task_path, "w") as f: - f.write(task_content) + self._write_task_file(host_repo_path, config["task_file"], task_content) # Record run in DB conn = get_db() cursor = conn.execute( - "INSERT INTO agent_runs (task_id, agent) VALUES (NULL, ?)", - (agent,), + "INSERT INTO agent_runs (task_id, agent) VALUES (?, ?)", + (task_id, agent), ) run_id = cursor.lastrowid conn.commit() - # Prepare output log + # Prepare output log path output_path = f"/app/data/runs/{run_id}.log" os.makedirs(os.path.dirname(output_path), exist_ok=True) - # Build shell command + # Build the claude command + task_file = config["task_file"] + system_prompt = config["system_prompt"] + allowed_tools = config["allowed_tools"] + cmd = ( - f'cd {repo_path} && {settings.claude_bin} --print ' - f'"$(cat {config["task_file"]})" ' - f'--system-prompt "$(cat {config["system_prompt"]})" ' - f'--allowedTools {config["allowed_tools"]}' + f'cd {local_repo_path} && ' + f'{self.CLAUDE_BIN} --print ' + f'"$(cat {task_file})" ' + f'--system-prompt "$(cat {system_prompt})" ' + f'--allowedTools {allowed_tools}' ) + logger.info(f"Launching agent '{agent}' for repo '{repo}', run_id={run_id}") + # Launch as background process with open(output_path, "w") as log_file: - subprocess.Popen( + proc = subprocess.Popen( ["bash", "-c", cmd], stdout=log_file, stderr=subprocess.STDOUT, - cwd=repo_path, + env={ + **os.environ, + "HOME": "/home/slin", + "GIT_AUTHOR_NAME": "claude-bot", + "GIT_AUTHOR_EMAIL": "claude-bot@mva154.local", + "GIT_COMMITTER_NAME": "claude-bot", + "GIT_COMMITTER_EMAIL": "claude-bot@mva154.local", + }, ) # Update DB with output path @@ -99,7 +128,192 @@ class AgentLauncher: conn.commit() conn.close() + # Start timeout watchdog + t = threading.Thread( + target=self._watchdog, + args=(proc.pid, run_id), + daemon=True, + ) + t.start() + + # Start monitor thread (waits for completion, commits, pushes) + task_row = get_db().execute("SELECT branch FROM tasks WHERE id=?", (task_id,)).fetchone() if task_id else None + agent_branch = task_row[0] if task_row else "main" + m = threading.Thread( + target=self._monitor_agent, + args=(proc, run_id, agent, repo, agent_branch), + daemon=True, + ) + m.start() + + logger.info(f"Agent '{agent}' launched, pid={proc.pid}, run_id={run_id}") return run_id + def _watchdog(self, pid: int, run_id: int, timeout: int = None): + """Kill agent if it exceeds timeout.""" + import time + if timeout is None: + timeout = self.AGENT_TIMEOUT + time.sleep(timeout) + try: + os.kill(pid, signal.SIGKILL) + logger.warning(f"Agent run_id={run_id} killed after {timeout}s timeout") + conn = get_db() + conn.execute( + "UPDATE agent_runs SET finished_at=datetime('now'), exit_code=-9 WHERE id=?", + (run_id,), + ) + conn.commit() + conn.close() + except ProcessLookupError: + pass # Already finished + + def _monitor_agent(self, proc, run_id, agent, repo, branch): + """Wait for agent to finish, commit+push results, update DB.""" + exit_code = proc.wait() + logger.info(f"Agent run_id={run_id} ({agent}) finished with exit_code={exit_code}") + + # Update DB + conn = get_db() + conn.execute( + "UPDATE agent_runs SET finished_at=datetime('now'), exit_code=? WHERE id=?", + (exit_code, run_id), + ) + conn.commit() + conn.close() + + # Commit and push any changes + repo_path = os.path.join(settings.repos_dir, repo) + try: + git_env = { + **os.environ, + "HOME": "/home/slin", + "GIT_AUTHOR_NAME": "claude-bot", + "GIT_AUTHOR_EMAIL": "claude-bot@mva154.local", + "GIT_COMMITTER_NAME": "claude-bot", + "GIT_COMMITTER_EMAIL": "claude-bot@mva154.local", + } + result = subprocess.run( + ["git", "-C", repo_path, "status", "--porcelain"], + capture_output=True, text=True, timeout=10, env=git_env + ) + if result.stdout.strip(): + # Add docs/ always + subprocess.run( + ["git", "-C", repo_path, "add", "docs/"], + capture_output=True, text=True, timeout=10, env=git_env + ) + # Add src/ and tests/ for developer + if agent == "developer": + subprocess.run( + ["git", "-C", repo_path, "add", "src/", "tests/"], + capture_output=True, text=True, timeout=10, env=git_env + ) + # Commit + commit_result = subprocess.run( + ["git", "-C", repo_path, "commit", "-m", + f"{agent}(ET): auto-commit from {agent} run_id={run_id}"], + capture_output=True, text=True, timeout=30, env=git_env + ) + if commit_result.returncode == 0: + push_result = subprocess.run( + ["git", "-C", repo_path, "push", "origin", branch], + capture_output=True, text=True, timeout=60, env=git_env + ) + if push_result.returncode == 0: + logger.info(f"Agent run_id={run_id}: committed and pushed to {branch}") + else: + logger.error(f"Agent run_id={run_id}: push failed: {push_result.stderr}") + else: + logger.warning(f"Agent run_id={run_id}: commit failed: {commit_result.stderr}") + else: + logger.info(f"Agent run_id={run_id}: no changes to commit") + except Exception as e: + logger.error(f"Agent run_id={run_id}: post-run git failed: {e}") + + # Auto-advance stage if agent finished successfully and QG passes + if exit_code == 0: + self._try_advance_stage(run_id, agent, repo, branch) + + def _try_advance_stage(self, run_id: int, agent: str, repo: str, branch: str): + """After agent finishes successfully, check QG and advance stage if possible.""" + try: + conn = get_db() + task_row = conn.execute( + "SELECT id, stage, work_item_id FROM tasks WHERE repo=? AND branch=?", + (repo, branch), + ).fetchone() + conn.close() + if not task_row: + return + + task_id, current_stage, work_item_id = task_row + qg_name = get_qg_for_stage(current_stage) + next_stage = get_next_stage(current_stage) + + if not next_stage: + return + + # Run QG check if defined + if qg_name and qg_name in QG_CHECKS: + check_fn = QG_CHECKS[qg_name] + if qg_name == "check_review_approved": + # Skip — handled by PR webhook + return + elif qg_name == "check_ci_green": + passed, reason = check_fn(repo, branch) + elif qg_name == "check_tests_passed": + passed, reason = check_fn(repo, work_item_id or "") + else: + passed, reason = check_fn(repo, work_item_id or "") + + if not passed: + logger.info(f"Task {task_id}: QG '{qg_name}' not passed after {agent}: {reason}") + return + elif qg_name: + return + + # Advance stage + update_task_stage(task_id, next_stage) + notify_stage_change(task_id, current_stage, next_stage) + plane_notify_stage(work_item_id, current_stage, next_stage) + logger.info(f"Task {task_id}: {current_stage} -> {next_stage} (auto-advance after {agent})") + + # Launch next agent if defined + next_agent = get_agent_for_stage(next_stage) + if next_agent: + task_desc = f"Work item: {work_item_id}\nRepo: {repo}\nBranch: {branch}\nStage: {next_stage}" + new_run_id = self.launch(next_agent, repo, task_desc, task_id=task_id) + logger.info(f"Task {task_id}: launched '{next_agent}' (run_id={new_run_id})") + + except Exception as e: + logger.error(f"Auto-advance failed for run_id={run_id}: {e}") + + def _write_task_file(self, host_repo_path: str, task_file: str, content: str): + """Write task file to host repo via docker run with stdin.""" + full_path = os.path.join(host_repo_path, task_file) + # Use docker run with stdin to write content to the file + cmd = [ + "docker", "run", "--rm", "-i", + "-v", f"{host_repo_path}:{host_repo_path}", + "-w", host_repo_path, + "python:3.12-slim", + "bash", "-c", f"cat > {full_path}", + ] + try: + result = subprocess.run( + cmd, + input=content, + capture_output=True, + text=True, + timeout=30, + ) + if result.returncode != 0: + logger.error(f"Failed to write task file: {result.stderr}") + raise RuntimeError(f"Failed to write task file: {result.stderr}") + logger.info(f"Task file written: {full_path}") + except subprocess.TimeoutExpired: + raise RuntimeError("Timeout writing task file") + launcher = AgentLauncher() diff --git a/src/config.py b/src/config.py index 72f345d..ba9f40d 100644 --- a/src/config.py +++ b/src/config.py @@ -7,15 +7,19 @@ class Settings(BaseSettings): plane_api_token: str = "" plane_workspace_slug: str = "" plane_webhook_secret: str = "" + plane_project_id: str = "" # Gitea gitea_url: str = "http://localhost:3000" gitea_token: str = "" gitea_webhook_secret: str = "" + gitea_owner: str = "admin" + default_repo: str = "enduro-trails" # Claude CLI - claude_bin: str = "/usr/bin/claude" - repos_dir: str = "/home/slin/repos" + claude_bin: str = "/opt/claude-code/bin/claude.exe" + repos_dir: str = "/repos" + host_repos_dir: str = "/home/slin/repos" # DB db_path: str = "/app/data/orchestrator.db" diff --git a/src/db.py b/src/db.py index 904500a..d490856 100644 --- a/src/db.py +++ b/src/db.py @@ -22,6 +22,7 @@ def init_db(): CREATE TABLE IF NOT EXISTS tasks ( id INTEGER PRIMARY KEY AUTOINCREMENT, plane_id TEXT, + work_item_id TEXT, repo TEXT NOT NULL, branch TEXT, stage TEXT DEFAULT 'created', @@ -40,3 +41,58 @@ def init_db(): ); """) conn.close() + + +def get_task_by_plane_id(plane_id: str) -> dict | None: + """Find task by Plane work item ID (checks plane_id and plane_issue_id).""" + conn = get_db() + row = conn.execute( + "SELECT * FROM tasks WHERE plane_id = ? OR plane_issue_id = ?", (plane_id, plane_id) + ).fetchone() + conn.close() + if row: + return dict(row) + return None + + +def get_task_by_repo_branch(repo: str, branch: str) -> dict | None: + """Find task by repo and branch name.""" + conn = get_db() + row = conn.execute( + "SELECT * FROM tasks WHERE repo = ? AND branch = ?", (repo, branch) + ).fetchone() + conn.close() + if row: + return dict(row) + return None + + +def update_task_stage(task_id: int, stage: str): + """Update task stage and timestamp.""" + conn = get_db() + conn.execute( + "UPDATE tasks SET stage = ?, updated_at = datetime('now') WHERE id = ?", + (stage, task_id), + ) + conn.commit() + conn.close() + + +def get_next_work_item_id(repo: str) -> str: + """Generate next work item ID (e.g., ET-003).""" + conn = get_db() + row = conn.execute( + "SELECT work_item_id FROM tasks WHERE repo = ? AND work_item_id IS NOT NULL ORDER BY id DESC LIMIT 1", + (repo,), + ).fetchone() + conn.close() + + if row and row["work_item_id"]: + # Parse ET-003 -> 3, increment + prefix, num = row["work_item_id"].rsplit("-", 1) + next_num = int(num) + 1 + else: + prefix = "ET" + next_num = 1 + + return f"{prefix}-{next_num:03d}" diff --git a/src/main.py b/src/main.py index 6e60511..92c6c34 100644 --- a/src/main.py +++ b/src/main.py @@ -1,9 +1,16 @@ from fastapi import FastAPI from contextlib import asynccontextmanager +import logging from .db import init_db from .webhooks.plane import router as plane_router from .webhooks.gitea import router as gitea_router +# Configure logging +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s [%(levelname)s] %(name)s: %(message)s", +) + @asynccontextmanager async def lifespan(app: FastAPI): diff --git a/src/notifications.py b/src/notifications.py new file mode 100644 index 0000000..2b78fb6 --- /dev/null +++ b/src/notifications.py @@ -0,0 +1,28 @@ +"""Notifications and logging for orchestrator events.""" + +import logging + +logger = logging.getLogger("orchestrator") + + +def notify_stage_change(task_id: int, old_stage: str, new_stage: str, agent: str = None): + """Log stage transition.""" + msg = f"Task {task_id}: {old_stage} → {new_stage}" + if agent: + msg += f" (launching {agent})" + logger.info(msg) + + +def notify_qg_failure(task_id: int, stage: str, check: str, reason: str): + """Log QG check failure.""" + logger.warning(f"Task {task_id}: QG failed at stage '{stage}', check={check}: {reason}") + + +def notify_agent_finished(run_id: int, agent: str, exit_code: int): + """Log agent completion.""" + logger.info(f"Agent run {run_id} ({agent}) finished with exit code {exit_code}") + + +def notify_error(task_id: int, error: str): + """Log error for a task.""" + logger.error(f"Task {task_id}: ERROR — {error}") diff --git a/src/plane_sync.py b/src/plane_sync.py new file mode 100644 index 0000000..4d1e5dc --- /dev/null +++ b/src/plane_sync.py @@ -0,0 +1,129 @@ +"""Plane API sync — update issue state and add comments.""" + +import logging +import httpx +from .config import settings + +logger = logging.getLogger("orchestrator.plane_sync") + +PLANE_BASE = f"{settings.plane_api_url}/api/v1" +PLANE_HEADERS = {"X-API-Key": settings.plane_api_token} +WORKSPACE = settings.plane_workspace_slug +PROJECT_ID = settings.plane_project_id or "7a79f0a9-5278-49cd-9007-9a338f238f9c" + +# Map orchestrator stages to Plane states +STAGE_TO_STATE = { + "created": "2c7d3df3-9eb9-419b-92b7-d7d560bcdd10", # Todo + "analysis": "b873d9eb-993c-48cd-97ac-99a9b1623967", # In Progress + "architecture": "b873d9eb-993c-48cd-97ac-99a9b1623967", # In Progress + "development": "b873d9eb-993c-48cd-97ac-99a9b1623967", # In Progress + "review": "b873d9eb-993c-48cd-97ac-99a9b1623967", # In Progress + "testing": "b873d9eb-993c-48cd-97ac-99a9b1623967", # In Progress + "deploy": "b873d9eb-993c-48cd-97ac-99a9b1623967", # In Progress + "done": "381a2833-3c4e-4be5-bd0f-be84cb946ad8", # Done +} + + +def find_issue_id(work_item_id: str) -> str | None: + """Find Plane issue UUID by work_item_id (e.g. 'ET-002').""" + # Primary: lookup from DB (plane_issue_id column) + try: + from .db import get_db + conn = get_db() + row = conn.execute( + "SELECT plane_issue_id FROM tasks WHERE work_item_id = ? AND plane_issue_id IS NOT NULL", + (work_item_id,) + ).fetchone() + if row and row[0]: + return row[0] + except Exception as e: + logger.debug(f"DB lookup failed for {work_item_id}: {e}") + + # Fallback: search via Plane API + url = f"{PLANE_BASE}/workspaces/{WORKSPACE}/projects/{PROJECT_ID}/issues/" + try: + # First try search by work_item_id + resp = httpx.get(url, headers=PLANE_HEADERS, params={"search": work_item_id}, timeout=10) + resp.raise_for_status() + data = resp.json() + results = data.get("results", data if isinstance(data, list) else []) + for issue in results: + seq = issue.get("sequence_id") + identifier = f"ET-{seq:03d}" if seq else "" + if identifier == work_item_id or work_item_id in issue.get("name", ""): + return issue["id"] + # Fallback: get all issues and match by sequence_id number + if work_item_id.startswith("ET-"): + try: + target_num = int(work_item_id.split("-")[1]) + except (IndexError, ValueError): + target_num = None + if target_num: + resp2 = httpx.get(url, headers=PLANE_HEADERS, timeout=10) + resp2.raise_for_status() + data2 = resp2.json() + results2 = data2.get("results", data2 if isinstance(data2, list) else []) + for issue in results2: + if issue.get("sequence_id") == target_num: + return issue["id"] + except Exception as e: + logger.error(f"Failed to find issue for {work_item_id}: {e}") + return None + + +def update_issue_state(work_item_id: str, stage: str): + """Update Plane issue state based on orchestrator stage.""" + state_id = STAGE_TO_STATE.get(stage) + if not state_id: + return + + issue_id = find_issue_id(work_item_id) + if not issue_id: + logger.warning(f"Issue not found in Plane for {work_item_id}") + return + + url = f"{PLANE_BASE}/workspaces/{WORKSPACE}/projects/{PROJECT_ID}/issues/{issue_id}/" + try: + resp = httpx.patch(url, headers=PLANE_HEADERS, json={"state": state_id}, timeout=10) + resp.raise_for_status() + logger.info(f"Plane: {work_item_id} state -> {stage} ({state_id[:8]}...)") + except Exception as e: + logger.error(f"Failed to update Plane state for {work_item_id}: {e}") + + +def add_comment(work_item_id: str, text: str): + """Add a comment to Plane issue.""" + issue_id = find_issue_id(work_item_id) + if not issue_id: + logger.warning(f"Issue not found in Plane for {work_item_id}, skipping comment") + return + + url = f"{PLANE_BASE}/workspaces/{WORKSPACE}/projects/{PROJECT_ID}/issues/{issue_id}/comments/" + html = f"

{text}

" + try: + resp = httpx.post(url, headers=PLANE_HEADERS, json={"comment_html": html}, timeout=10) + resp.raise_for_status() + logger.info(f"Plane: comment added to {work_item_id}") + except Exception as e: + logger.error(f"Failed to add comment to {work_item_id}: {e}") + + +def notify_stage_change(work_item_id: str, old_stage: str, new_stage: str, agent: str = None): + """Notify Plane about stage transition.""" + update_issue_state(work_item_id, new_stage) + + msg = f"🔄 Stage: {old_stage} → {new_stage}" + if agent: + msg += f" (launching {agent})" + add_comment(work_item_id, msg) + + +def notify_qg_failure(work_item_id: str, stage: str, check: str, reason: str): + """Notify Plane about QG failure.""" + add_comment(work_item_id, f"⚠️ QG failed at {stage}: {check} — {reason}") + + +def notify_done(work_item_id: str): + """Mark issue as Done in Plane.""" + update_issue_state(work_item_id, "done") + add_comment(work_item_id, "✅ Task completed! PR merged and deployed.") diff --git a/src/qg/checks.py b/src/qg/checks.py index 3fe7fa8..cfd3c42 100644 --- a/src/qg/checks.py +++ b/src/qg/checks.py @@ -1,26 +1,147 @@ -# Quality Gate checks placeholder -# Will be expanded as pipeline matures +"""Quality Gate checks — real implementations using Gitea/Plane API and filesystem.""" + +import os +import logging +import httpx +from ..config import settings + +logger = logging.getLogger("orchestrator.qg") + +# Shared httpx client config +GITEA_HEADERS = {"Authorization": f"token {settings.gitea_token}"} +GITEA_BASE = f"{settings.gitea_url}/api/v1" -def check_analysis_complete(task_id: int) -> bool: - """Check if analysis artifacts exist.""" - # TODO: verify .task-arch.md exists in repo - return True +def check_analysis_complete(repo: str, work_item_id: str) -> tuple[bool, str]: + """ + Check if analysis artifacts exist in the repo branch. + Required files: + - docs/work-items//01-brd.md + - docs/work-items//02-trz.md + - docs/work-items//03-acceptance-criteria.md + - docs/work-items//04-test-plan.yaml + """ + required_files = [ + f"docs/work-items/{work_item_id}/01-brd.md", + f"docs/work-items/{work_item_id}/02-trz.md", + f"docs/work-items/{work_item_id}/03-acceptance-criteria.md", + f"docs/work-items/{work_item_id}/04-test-plan.yaml", + ] + + repo_path = os.path.join(settings.repos_dir, repo) + missing = [] + + for f in required_files: + full_path = os.path.join(repo_path, f) + if not os.path.isfile(full_path): + missing.append(f) + + if missing: + return False, f"Missing files: {', '.join(missing)}" + return True, "All analysis artifacts present" -def check_architecture_approved(task_id: int) -> bool: - """Check if architecture was approved in Plane.""" - # TODO: check Plane comment for :approved: - return False +def check_architecture_done(repo: str, work_item_id: str) -> tuple[bool, str]: + """ + Check if architecture artifacts exist. + Required: docs/work-items//06-adr/ (at least 1 file) + OR: docs/work-items//07-infra-requirements.md + """ + repo_path = os.path.join(settings.repos_dir, repo) + + adr_dir = os.path.join(repo_path, f"docs/work-items/{work_item_id}/06-adr") + infra_file = os.path.join(repo_path, f"docs/work-items/{work_item_id}/07-infra-requirements.md") + + if os.path.isdir(adr_dir) and len(os.listdir(adr_dir)) > 0: + return True, "ADR directory exists with files" + + if os.path.isfile(infra_file): + return True, "Infra requirements file exists" + + return False, "No ADR directory or infra-requirements.md found" -def check_ci_green(repo: str, branch: str) -> bool: - """Check if CI status is green for branch.""" - # TODO: query Gitea commit status API - return False +def check_ci_green(repo: str, branch: str) -> tuple[bool, str]: + """ + Check if CI status is green for branch via Gitea API. + GET /repos/{owner}/{repo}/commits/{branch}/status + """ + owner = settings.gitea_owner + url = f"{GITEA_BASE}/repos/{owner}/{repo}/commits/{branch}/status" + + try: + resp = httpx.get(url, headers=GITEA_HEADERS, timeout=10) + if resp.status_code == 404: + return False, f"Branch '{branch}' not found or no status" + resp.raise_for_status() + data = resp.json() + state = data.get("state", "unknown") + if state == "success": + return True, "CI green" + return False, f"CI state: {state}" + except httpx.HTTPError as e: + logger.error(f"Gitea API error checking CI: {e}") + return False, f"API error: {e}" -def check_review_approved(repo: str, pr_number: int) -> bool: - """Check if PR has approved review.""" - # TODO: query Gitea PR reviews API - return False +def check_review_approved(repo: str, pr_number: int) -> tuple[bool, str]: + """ + Check if PR has at least one approved review and no request_changes. + GET /repos/{owner}/{repo}/pulls/{pr_number}/reviews + """ + owner = settings.gitea_owner + url = f"{GITEA_BASE}/repos/{owner}/{repo}/pulls/{pr_number}/reviews" + + try: + resp = httpx.get(url, headers=GITEA_HEADERS, timeout=10) + resp.raise_for_status() + reviews = resp.json() + + approved = 0 + changes_requested = 0 + for review in reviews: + state = review.get("state", "").upper() + if state == "APPROVED": + approved += 1 + elif state == "REQUEST_CHANGES": + changes_requested += 1 + + if changes_requested > 0: + return False, f"Changes requested ({changes_requested} reviews)" + if approved > 0: + return True, f"Approved ({approved} reviews)" + return False, "No reviews yet" + except httpx.HTTPError as e: + logger.error(f"Gitea API error checking reviews: {e}") + return False, f"API error: {e}" + + +def check_tests_passed(repo: str, work_item_id: str) -> tuple[bool, str]: + """ + Check if test report exists and contains PASS indicator. + File: docs/work-items//13-test-report.md + """ + repo_path = os.path.join(settings.repos_dir, repo) + report_path = os.path.join(repo_path, f"docs/work-items/{work_item_id}/13-test-report.md") + + if not os.path.isfile(report_path): + return False, "Test report not found" + + try: + with open(report_path, "r") as f: + content = f.read() + if "PASS" in content or "All tests passed" in content: + return True, "Test report indicates PASS" + return False, "Test report exists but no PASS indicator found" + except OSError as e: + return False, f"Error reading test report: {e}" + + +# Registry for dynamic lookup by name +QG_CHECKS = { + "check_analysis_complete": check_analysis_complete, + "check_architecture_done": check_architecture_done, + "check_ci_green": check_ci_green, + "check_review_approved": check_review_approved, + "check_tests_passed": check_tests_passed, +} diff --git a/src/stages.py b/src/stages.py new file mode 100644 index 0000000..684ff6f --- /dev/null +++ b/src/stages.py @@ -0,0 +1,45 @@ +"""Stage machine for orchestrator pipeline.""" + +STAGE_TRANSITIONS = { + "created": {"next": "analysis", "agent": None, "qg": None}, + "analysis": {"next": "architecture", "agent": "architect", "qg": "check_analysis_complete"}, + "architecture": {"next": "development", "agent": "developer", "qg": "check_architecture_done"}, + "development": {"next": "review", "agent": "reviewer", "qg": "check_ci_green"}, + "review": {"next": "testing", "agent": "tester", "qg": "check_review_approved"}, + "testing": {"next": "deploy", "agent": None, "qg": "check_tests_passed"}, + "deploy": {"next": "done", "agent": None, "qg": None}, + "done": {"next": None, "agent": None, "qg": None}, +} + + +def get_next_stage(current_stage: str) -> str | None: + """Get the next stage after current.""" + transition = STAGE_TRANSITIONS.get(current_stage) + if not transition: + return None + return transition["next"] + + +def get_agent_for_stage(stage: str) -> str | None: + """Get the agent to launch when entering this stage.""" + transition = STAGE_TRANSITIONS.get(stage) + if not transition: + return None + return transition["agent"] + + +def get_qg_for_stage(current_stage: str) -> str | None: + """Get the QG check function name required to leave current stage.""" + transition = STAGE_TRANSITIONS.get(current_stage) + if not transition: + return None + return transition["qg"] + + +def get_previous_stage(current_stage: str) -> str | None: + """Get the previous stage (for rollback).""" + stages = list(STAGE_TRANSITIONS.keys()) + idx = stages.index(current_stage) if current_stage in stages else -1 + if idx <= 0: + return None + return stages[idx - 1] diff --git a/src/webhooks/gitea.py b/src/webhooks/gitea.py index 7a02c66..d9417b8 100644 --- a/src/webhooks/gitea.py +++ b/src/webhooks/gitea.py @@ -1,14 +1,53 @@ -from fastapi import APIRouter, Request +"""Gitea webhook handlers — full implementation.""" + +import hmac +import subprocess +import os +import hashlib import json -from ..db import get_db +import logging +import httpx +from fastapi import APIRouter, Request, HTTPException + +from ..config import settings +from ..db import get_db, get_task_by_repo_branch, update_task_stage +from ..stages import get_next_stage, get_agent_for_stage +from ..qg.checks import check_ci_green, check_review_approved +from ..notifications import notify_stage_change, notify_qg_failure, notify_error +from ..agents.launcher import launcher +from ..plane_sync import notify_stage_change as plane_notify_stage + +logger = logging.getLogger("orchestrator.webhooks.gitea") router = APIRouter() +# Max retries for developer on request_changes +MAX_DEV_RETRIES = 3 + + +def verify_gitea_signature(body: bytes, signature: str) -> bool: + """Verify Gitea webhook HMAC-SHA256 signature.""" + if not settings.gitea_webhook_secret: + return True # Skip verification if no secret configured + expected = hmac.new( + settings.gitea_webhook_secret.encode(), + body, + hashlib.sha256, + ).hexdigest() + return hmac.compare_digest(expected, signature) + @router.post("/gitea") async def gitea_webhook(request: Request): """Handle Gitea webhook events.""" body = await request.body() + + # Verify HMAC signature + signature = request.headers.get("X-Gitea-Signature", "") + if not verify_gitea_signature(body, signature): + logger.warning("Gitea webhook: invalid signature") + raise HTTPException(status_code=401, detail="Invalid signature") + payload = json.loads(body) # Log event @@ -19,36 +58,232 @@ async def gitea_webhook(request: Request): ("gitea", event_type, body.decode()), ) conn.commit() + conn.close() if event_type == "push": - await handle_push(payload, conn) - elif event_type == "pull_request": - await handle_pr(payload, conn) + await handle_push(payload) + elif event_type in ("pull_request", "pull_request_approved", "pull_request_review_approved"): + await handle_pr(payload) elif event_type == "status": - await handle_ci_status(payload, conn) + await handle_ci_status(payload) - conn.close() return {"status": "accepted"} -async def handle_push(payload: dict, conn): - """Push event — log for now.""" - pass +async def handle_push(payload: dict): + """ + Push event: + - If stage=architecture and push contains ADR files → advance to development + - If stage=development and push contains src/ → wait for CI + """ + ref = payload.get("ref", "") + # Extract branch: refs/heads/feature/ET-003-slug → feature/ET-003-slug + if not ref.startswith("refs/heads/"): + return + branch = ref.removeprefix("refs/heads/") + + repo_name = payload.get("repository", {}).get("name", settings.default_repo) + + task = get_task_by_repo_branch(repo_name, branch) + if not task: + logger.debug(f"Push to '{branch}' — no matching task found") + return + + task_id = task["id"] + current_stage = task["stage"] + work_item_id = task.get("work_item_id", "") + + # Collect modified files from commits + modified_files = set() + for commit in payload.get("commits", []): + modified_files.update(commit.get("added", [])) + modified_files.update(commit.get("modified", [])) + + if current_stage == "architecture": + # Check if ADR files were pushed + has_adr = any( + f"docs/work-items/{work_item_id}/06-adr/" in f + or f"docs/work-items/{work_item_id}/07-infra-requirements.md" == f + for f in modified_files + ) + if has_adr: + # Advance to development + next_stage = "development" + update_task_stage(task_id, next_stage) + notify_stage_change(task_id, current_stage, next_stage) + plane_notify_stage(work_item_id, current_stage, next_stage) + + agent = get_agent_for_stage(current_stage) + if agent: + try: + task_desc = f"Work item: {work_item_id}\nRepo: {repo_name}\nBranch: {branch}\nStage: {next_stage}" + run_id = launcher.launch(agent, repo_name, task_desc, task_id=task_id) + logger.info(f"Task {task_id}: push triggered {current_stage} → {next_stage}, launched '{agent}' (run_id={run_id})") + except Exception as e: + notify_error(task_id, f"Failed to launch agent '{agent}': {e}") + + elif current_stage == "development": + # Source files pushed — just log, wait for CI + has_src = any(f.startswith("src/") for f in modified_files) + if has_src: + logger.info(f"Task {task_id}: source push detected on '{branch}', waiting for CI") -async def handle_pr(payload: dict, conn): - """PR event — check reviews, CI status.""" +async def handle_ci_status(payload: dict): + """ + CI status update: + - If state=success and stage=development → advance to review, launch reviewer + - If state=failure → log + """ + state = payload.get("state", "") + # Extract branch from target_url or branches + branches = payload.get("branches", []) + branch = "" + if branches: + branch = branches[0].get("name", "") + + # Alternative: find branch by SHA from tasks DB + if not branch: + sha = payload.get("sha", "") + repo_name = payload.get("repository", {}).get("name", settings.default_repo) + # Try to find task by checking git branch containing this SHA + try: + result = subprocess.run( + ["git", "-C", os.path.join(settings.repos_dir, repo_name), + "branch", "-r", "--contains", sha], + capture_output=True, text=True, timeout=10, + ) + for line in result.stdout.strip().splitlines(): + b = line.strip().replace("origin/", "") + if b.startswith("feature/"): + branch = b + break + except Exception: + pass + if not branch: + logger.debug(f"CI status event: could not determine branch for sha={sha}") + return + + repo_name = payload.get("repository", {}).get("name", settings.default_repo) + task = get_task_by_repo_branch(repo_name, branch) + if not task: + return + + task_id = task["id"] + current_stage = task["stage"] + work_item_id = task.get("work_item_id", "") + + if state == "success" and current_stage == "development": + # Verify CI is actually green via API (double-check) + passed, reason = check_ci_green(repo_name, branch) + if passed: + next_stage = "review" + update_task_stage(task_id, next_stage) + notify_stage_change(task_id, current_stage, next_stage) + plane_notify_stage(work_item_id, current_stage, next_stage) + + agent = get_agent_for_stage(current_stage) + if agent: + try: + task_desc = f"Work item: {work_item_id}\nRepo: {repo_name}\nBranch: {branch}\nStage: {next_stage}" + run_id = launcher.launch(agent, repo_name, task_desc, task_id=task_id) + logger.info(f"Task {task_id}: CI green → {next_stage}, launched '{agent}' (run_id={run_id})") + except Exception as e: + notify_error(task_id, f"Failed to launch agent '{agent}': {e}") + else: + notify_qg_failure(task_id, current_stage, "check_ci_green", reason) + + elif state == "failure": + logger.warning(f"Task {task_id}: CI failed on branch '{branch}'") + notify_error(task_id, f"CI failed on branch '{branch}'") + + +async def handle_pr(payload: dict): + """ + PR event: + - action=reviewed + approved → advance to testing, launch tester + - action=reviewed + request_changes → back to development, relaunch developer (max 3x) + - action=closed + merged → stage=done + """ action = payload.get("action", "") pr = payload.get("pull_request", {}) + review = payload.get("review", {}) - if action == "reviewed" and pr.get("state") == "approved": - # TODO: QG-5 check -> launch Tester - pass + # Get branch from PR head + head_branch = pr.get("head", {}).get("ref", "") + repo_name = payload.get("repository", {}).get("name", settings.default_repo) + if not head_branch: + return -async def handle_ci_status(payload: dict, conn): - """CI status update — check if all green -> advance.""" - state = payload.get("state", "") - if state == "success": - # TODO: Check all required contexts green -> advance stage - pass + task = get_task_by_repo_branch(repo_name, head_branch) + if not task: + logger.debug(f"PR event for branch '{head_branch}' — no matching task") + return + + task_id = task["id"] + current_stage = task["stage"] + work_item_id = task.get("work_item_id", "") + + if action == "reviewed": + # Gitea sends review.state (older) or review.type (newer format) + review_state = review.get("state", "").upper() + if not review_state and review.get("type", ""): + # Map type field: "pull_request_review_approved" -> "APPROVED" + rtype = review.get("type", "") + if "approved" in rtype.lower(): + review_state = "APPROVED" + elif "request_changes" in rtype.lower() or "rejected" in rtype.lower(): + review_state = "REQUEST_CHANGES" + + if review_state == "APPROVED" and current_stage == "review": + # Advance to testing + pr_number = pr.get("number") + passed, reason = check_review_approved(repo_name, pr_number) + if passed: + next_stage = "testing" + update_task_stage(task_id, next_stage) + notify_stage_change(task_id, current_stage, next_stage) + plane_notify_stage(work_item_id, current_stage, next_stage) + + agent = get_agent_for_stage(current_stage) + if agent: + try: + task_desc = f"Work item: {work_item_id}\nRepo: {repo_name}\nBranch: {head_branch}\nStage: {next_stage}" + run_id = launcher.launch(agent, repo_name, task_desc, task_id=task_id) + logger.info(f"Task {task_id}: PR approved → {next_stage}, launched '{agent}' (run_id={run_id})") + except Exception as e: + notify_error(task_id, f"Failed to launch agent '{agent}': {e}") + else: + notify_qg_failure(task_id, current_stage, "check_review_approved", reason) + + elif review_state == "REQUEST_CHANGES" and current_stage == "review": + # Count retries + conn = get_db() + retry_count = conn.execute( + "SELECT COUNT(*) as cnt FROM agent_runs WHERE task_id = ? AND agent = 'developer'", + (task_id,), + ).fetchone()["cnt"] + conn.close() + + if retry_count < MAX_DEV_RETRIES: + # Back to development, relaunch developer + update_task_stage(task_id, "development") + notify_stage_change(task_id, current_stage, "development") + try: + task_desc = ( + f"Work item: {work_item_id}\nRepo: {repo_name}\nBranch: {head_branch}\n" + f"Stage: development\nNote: Changes requested in review (attempt {retry_count + 1}/{MAX_DEV_RETRIES})" + ) + run_id = launcher.launch("developer", repo_name, task_desc, task_id=task_id) + logger.info(f"Task {task_id}: changes requested, relaunching developer (attempt {retry_count + 1})") + except Exception as e: + notify_error(task_id, f"Failed to relaunch developer: {e}") + else: + notify_error(task_id, f"Max developer retries ({MAX_DEV_RETRIES}) reached, escalating") + logger.error(f"Task {task_id}: max retries reached, needs manual intervention") + + elif action == "closed" and pr.get("merged", False): + update_task_stage(task_id, "done") + notify_stage_change(task_id, current_stage, "done") + logger.info(f"Task {task_id}: PR merged, stage → done") diff --git a/src/webhooks/plane.py b/src/webhooks/plane.py index adabd62..395298c 100644 --- a/src/webhooks/plane.py +++ b/src/webhooks/plane.py @@ -1,14 +1,58 @@ -from fastapi import APIRouter, Request +"""Plane webhook handlers — full implementation.""" + +import hmac +import hashlib +import re import json -from ..db import get_db +import logging +import httpx +from fastapi import APIRouter, Request, HTTPException + +from ..config import settings +from ..db import ( + get_db, + get_task_by_plane_id, + get_next_work_item_id, + update_task_stage, +) +from ..stages import get_next_stage, get_agent_for_stage, get_qg_for_stage, get_previous_stage +from ..qg.checks import QG_CHECKS +from ..notifications import notify_stage_change, notify_qg_failure, notify_error +from ..agents.launcher import launcher +from ..plane_sync import ( + notify_stage_change as plane_notify_stage, + notify_qg_failure as plane_notify_qg, + notify_done as plane_notify_done, +) + +logger = logging.getLogger("orchestrator.webhooks.plane") router = APIRouter() +def verify_plane_signature(body: bytes, signature: str) -> bool: + """Verify Plane webhook HMAC-SHA256 signature.""" + if not settings.plane_webhook_secret: + return True # Skip verification if no secret configured + expected = hmac.new( + settings.plane_webhook_secret.encode(), + body, + hashlib.sha256, + ).hexdigest() + return hmac.compare_digest(expected, signature) + + @router.post("/plane") async def plane_webhook(request: Request): """Handle Plane webhook events.""" body = await request.body() + + # Verify HMAC signature + signature = request.headers.get("X-Plane-Signature", "") + if not verify_plane_signature(body, signature): + logger.warning("Plane webhook: invalid signature") + raise HTTPException(status_code=401, detail="Invalid signature") + payload = json.loads(body) # Log event @@ -18,32 +62,216 @@ async def plane_webhook(request: Request): ("plane", payload.get("event", "unknown"), body.decode()), ) conn.commit() + conn.close() event = payload.get("event") + action = payload.get("action", "") data = payload.get("data", {}) - if event == "work_item.created": - await handle_work_item_created(data, conn) - elif event == "comment.created": - await handle_comment(data, conn) + if (event == "work_item.created") or (event == "issue" and action == "created"): + await handle_work_item_created(data) + elif (event == "comment.created") or (event == "issue_comment" and action == "created"): + await handle_comment(data) - conn.close() return {"status": "accepted"} -async def handle_work_item_created(data: dict, conn): - """New work item -> create task record.""" +async def handle_work_item_created(data: dict): + """ + New work item created in Plane. + 1. Generate work_item_id + 2. Create task in DB + 3. Create branch in Gitea + 4. Create initial docs folder + 5. Set stage to 'analysis' + """ plane_id = data.get("id", "") + name = data.get("name", "untitled") + repo = settings.default_repo + + # Generate work item ID + work_item_id = get_next_work_item_id(repo) + + # Create slug from name + slug = re.sub(r"[^a-z0-9]+", "-", name.lower()).strip("-")[:30] + branch = f"feature/{work_item_id}-{slug}" + + # Insert task into DB + conn = get_db() conn.execute( - "INSERT INTO tasks (plane_id, repo, stage) VALUES (?, ?, ?)", - (plane_id, "enduro-trails", "analysis"), + "INSERT INTO tasks (plane_id, work_item_id, repo, branch, stage, plane_issue_id) VALUES (?, ?, ?, ?, ?, ?)", + (plane_id, work_item_id, repo, branch, "analysis", plane_id), ) conn.commit() + conn.close() + + # Create branch in Gitea + try: + await _create_gitea_branch(repo, branch) + except Exception as e: + logger.error(f"Failed to create branch '{branch}': {e}") + # Task is created, branch creation failed — log but don't crash + notify_error(0, f"Branch creation failed: {e}") + return + + # Create initial docs structure via Gitea API (create file) + try: + await _create_initial_docs(repo, branch, work_item_id, name) + except Exception as e: + logger.error(f"Failed to create initial docs: {e}") + + logger.info(f"Task created: {work_item_id} ({name}), branch={branch}, stage=analysis") -async def handle_comment(data: dict, conn): - """Check for :approved: reaction -> advance stage.""" - comment_body = data.get("comment", "") +async def handle_comment(data: dict): + """ + Handle comment event — check for :approved: or :rejected:. + Advance or rollback stage accordingly. + """ + comment_body = data.get("comment", data.get("body", data.get("comment_html", ""))) + plane_id = data.get("work_item_id", data.get("issue_id", "")) + + if not plane_id: + logger.warning("Comment event without work_item_id, skipping") + return + + task = get_task_by_plane_id(plane_id) + if not task: + logger.warning(f"No task found for plane_id={plane_id}") + return + + task_id = task["id"] + current_stage = task["stage"] + repo = task["repo"] + work_item_id = task.get("work_item_id", "") + branch = task.get("branch", "") + + if ":rejected:" in comment_body: + # Rollback to previous stage + prev_stage = get_previous_stage(current_stage) + if prev_stage: + update_task_stage(task_id, prev_stage) + notify_stage_change(task_id, current_stage, prev_stage) + logger.info(f"Task {task_id}: rejected, rolled back {current_stage} → {prev_stage}") + return + if ":approved:" in comment_body: - # TODO: Determine which task, advance QG - pass + # Try to advance stage + await _try_advance_stage(task_id, current_stage, repo, work_item_id, branch) + + +async def _try_advance_stage( + task_id: int, current_stage: str, repo: str, work_item_id: str, branch: str +): + """Run QG check for current stage and advance if passed.""" + qg_name = get_qg_for_stage(current_stage) + next_stage = get_next_stage(current_stage) + + if not next_stage: + logger.info(f"Task {task_id}: already at terminal stage '{current_stage}'") + return + + # Run QG check if one is required + if qg_name: + qg_func = QG_CHECKS.get(qg_name) + if not qg_func: + logger.error(f"QG function '{qg_name}' not found in registry") + return + + # Determine args based on QG function + if qg_name in ("check_analysis_complete", "check_architecture_done", "check_tests_passed"): + passed, reason = qg_func(repo, work_item_id) + elif qg_name == "check_ci_green": + passed, reason = qg_func(repo, branch) + elif qg_name == "check_review_approved": + # Find PR number by branch via Gitea API + import httpx as _httpx + from ..config import settings as _s + _owner = _s.gitea_owner + _url = f"{_s.gitea_url}/api/v1/repos/{_owner}/{repo}/pulls?state=open&limit=50" + _headers = {"Authorization": f"token {_s.gitea_token}"} + try: + _resp = _httpx.get(_url, headers=_headers, timeout=10) + _prs = _resp.json() + _pr_number = None + for _pr in _prs: + if _pr.get("head", {}).get("ref") == branch: + _pr_number = _pr["number"] + break + if _pr_number: + passed, reason = qg_func(repo, _pr_number) + else: + # No open PR but review file exists — check file-based + import os + _review_path = os.path.join(_s.repos_dir, repo, f"docs/work-items/{work_item_id}/12-review.md") + _review_path2 = os.path.join(_s.repos_dir, repo, f"docs/work-items/{work_item_id}/09-review.md") + if os.path.isfile(_review_path) or os.path.isfile(_review_path2): + passed, reason = True, "Review file exists (file-based approval)" + else: + passed, reason = False, "No open PR found and no review file" + except Exception as _e: + passed, reason = False, f"Error finding PR: {_e}" + else: + passed, reason = False, f"Unknown QG: {qg_name}" + + if not passed: + notify_qg_failure(task_id, current_stage, qg_name, reason) + plane_notify_qg(work_item_id, current_stage, qg_name, reason) + return + + # Advance stage + update_task_stage(task_id, next_stage) + notify_stage_change(task_id, current_stage, next_stage) + plane_notify_stage(work_item_id, current_stage, next_stage) + + # Launch agent associated with the current stage's transition + agent = get_agent_for_stage(current_stage) + if agent: + try: + task_desc = f"Work item: {work_item_id}\nRepo: {repo}\nBranch: {branch}\nStage: {next_stage}" + run_id = launcher.launch(agent, repo, task_desc, task_id=task_id) + plane_notify_stage(work_item_id, current_stage, next_stage, agent) + logger.info(f"Task {task_id}: launched agent '{agent}', run_id={run_id}") + except Exception as e: + notify_error(task_id, f"Failed to launch agent '{agent}': {e}") + logger.error(f"Agent launch failed: {e}") + + +async def _create_gitea_branch(repo: str, branch: str): + """Create a new branch in Gitea from main.""" + owner = settings.gitea_owner + url = f"{settings.gitea_url}/api/v1/repos/{owner}/{repo}/branches" + headers = {"Authorization": f"token {settings.gitea_token}"} + payload = {"new_branch_name": branch, "old_branch_name": "main"} + + async with httpx.AsyncClient() as client: + resp = await client.post(url, json=payload, headers=headers, timeout=10) + if resp.status_code == 409: + logger.info(f"Branch '{branch}' already exists") + return + resp.raise_for_status() + logger.info(f"Created branch '{branch}' in {owner}/{repo}") + + +async def _create_initial_docs(repo: str, branch: str, work_item_id: str, name: str): + """Create initial business request doc in the feature branch.""" + owner = settings.gitea_owner + file_path = f"docs/work-items/{work_item_id}/00-business-request.md" + url = f"{settings.gitea_url}/api/v1/repos/{owner}/{repo}/contents/{file_path}" + headers = {"Authorization": f"token {settings.gitea_token}"} + + import base64 + content = f"# Business Request: {name}\n\nWork Item ID: {work_item_id}\n\n## Description\n\nTBD\n" + encoded = base64.b64encode(content.encode()).decode() + + payload = { + "message": f"docs: init {work_item_id} business request", + "content": encoded, + "branch": branch, + } + + async with httpx.AsyncClient() as client: + resp = await client.post(url, json=payload, headers=headers, timeout=10) + if resp.status_code in (201, 422): # 422 = already exists + return + resp.raise_for_status() diff --git a/tests/test_qg.py b/tests/test_qg.py new file mode 100644 index 0000000..fbaf52c --- /dev/null +++ b/tests/test_qg.py @@ -0,0 +1,188 @@ +import pytest +import os +import tempfile +from unittest.mock import patch, MagicMock +import httpx + +# Override DB path before importing app +_test_db = os.path.join(tempfile.gettempdir(), "test_orchestrator.db") +os.environ["ORCH_DB_PATH"] = _test_db +os.environ["ORCH_REPOS_DIR"] = tempfile.gettempdir() +os.environ["ORCH_GITEA_TOKEN"] = "test-token" +os.environ["ORCH_PLANE_API_TOKEN"] = "test-token" + +from src.qg.checks import ( + check_analysis_complete, + check_architecture_done, + check_ci_green, + check_review_approved, + check_tests_passed, +) + + +@pytest.fixture(autouse=True) +def setup_work_item_dir(tmp_path, monkeypatch): + """Create temp repo structure for filesystem checks.""" + monkeypatch.setattr("src.qg.checks.settings.repos_dir", str(tmp_path)) + repo_dir = tmp_path / "enduro-trails" + repo_dir.mkdir() + return repo_dir + + +class TestCheckAnalysisComplete: + def test_all_files_present(self, setup_work_item_dir): + repo_dir = setup_work_item_dir + wi_dir = repo_dir / "docs" / "work-items" / "ET-001" + wi_dir.mkdir(parents=True) + (wi_dir / "01-brd.md").write_text("# BRD") + (wi_dir / "02-trz.md").write_text("# TRZ") + (wi_dir / "03-acceptance-criteria.md").write_text("# AC") + (wi_dir / "04-test-plan.yaml").write_text("tests: []") + + passed, reason = check_analysis_complete("enduro-trails", "ET-001") + assert passed is True + + def test_missing_files(self, setup_work_item_dir): + repo_dir = setup_work_item_dir + wi_dir = repo_dir / "docs" / "work-items" / "ET-002" + wi_dir.mkdir(parents=True) + (wi_dir / "01-brd.md").write_text("# BRD") + + passed, reason = check_analysis_complete("enduro-trails", "ET-002") + assert passed is False + assert "Missing files" in reason + + def test_no_directory(self, setup_work_item_dir): + passed, reason = check_analysis_complete("enduro-trails", "ET-999") + assert passed is False + + +class TestCheckArchitectureDone: + def test_adr_directory_with_files(self, setup_work_item_dir): + repo_dir = setup_work_item_dir + adr_dir = repo_dir / "docs" / "work-items" / "ET-001" / "06-adr" + adr_dir.mkdir(parents=True) + (adr_dir / "001-use-postgres.md").write_text("# ADR") + + passed, reason = check_architecture_done("enduro-trails", "ET-001") + assert passed is True + + def test_infra_requirements(self, setup_work_item_dir): + repo_dir = setup_work_item_dir + wi_dir = repo_dir / "docs" / "work-items" / "ET-001" + wi_dir.mkdir(parents=True) + (wi_dir / "07-infra-requirements.md").write_text("# Infra") + + passed, reason = check_architecture_done("enduro-trails", "ET-001") + assert passed is True + + def test_empty_adr_directory(self, setup_work_item_dir): + repo_dir = setup_work_item_dir + adr_dir = repo_dir / "docs" / "work-items" / "ET-001" / "06-adr" + adr_dir.mkdir(parents=True) + + passed, reason = check_architecture_done("enduro-trails", "ET-001") + assert passed is False + + def test_nothing_present(self, setup_work_item_dir): + passed, reason = check_architecture_done("enduro-trails", "ET-001") + assert passed is False + + +class TestCheckCIGreen: + @patch("src.qg.checks.httpx.get") + def test_ci_success(self, mock_get): + mock_resp = MagicMock() + mock_resp.status_code = 200 + mock_resp.json.return_value = {"state": "success"} + mock_resp.raise_for_status = MagicMock() + mock_get.return_value = mock_resp + + passed, reason = check_ci_green("enduro-trails", "feature/ET-001-test") + assert passed is True + assert "green" in reason.lower() + + @patch("src.qg.checks.httpx.get") + def test_ci_pending(self, mock_get): + mock_resp = MagicMock() + mock_resp.status_code = 200 + mock_resp.json.return_value = {"state": "pending"} + mock_resp.raise_for_status = MagicMock() + mock_get.return_value = mock_resp + + passed, reason = check_ci_green("enduro-trails", "feature/ET-001-test") + assert passed is False + + @patch("src.qg.checks.httpx.get") + def test_ci_branch_not_found(self, mock_get): + mock_resp = MagicMock() + mock_resp.status_code = 404 + mock_get.return_value = mock_resp + + passed, reason = check_ci_green("enduro-trails", "nonexistent") + assert passed is False + + +class TestCheckReviewApproved: + @patch("src.qg.checks.httpx.get") + def test_approved(self, mock_get): + mock_resp = MagicMock() + mock_resp.status_code = 200 + mock_resp.json.return_value = [ + {"state": "APPROVED", "user": {"login": "reviewer1"}} + ] + mock_resp.raise_for_status = MagicMock() + mock_get.return_value = mock_resp + + passed, reason = check_review_approved("enduro-trails", 1) + assert passed is True + + @patch("src.qg.checks.httpx.get") + def test_changes_requested(self, mock_get): + mock_resp = MagicMock() + mock_resp.status_code = 200 + mock_resp.json.return_value = [ + {"state": "REQUEST_CHANGES", "user": {"login": "reviewer1"}} + ] + mock_resp.raise_for_status = MagicMock() + mock_get.return_value = mock_resp + + passed, reason = check_review_approved("enduro-trails", 1) + assert passed is False + assert "Changes requested" in reason + + @patch("src.qg.checks.httpx.get") + def test_no_reviews(self, mock_get): + mock_resp = MagicMock() + mock_resp.status_code = 200 + mock_resp.json.return_value = [] + mock_resp.raise_for_status = MagicMock() + mock_get.return_value = mock_resp + + passed, reason = check_review_approved("enduro-trails", 1) + assert passed is False + + +class TestCheckTestsPassed: + def test_report_with_pass(self, setup_work_item_dir): + repo_dir = setup_work_item_dir + wi_dir = repo_dir / "docs" / "work-items" / "ET-001" + wi_dir.mkdir(parents=True) + (wi_dir / "13-test-report.md").write_text("# Test Report\n\nResult: PASS\n") + + passed, reason = check_tests_passed("enduro-trails", "ET-001") + assert passed is True + + def test_report_without_pass(self, setup_work_item_dir): + repo_dir = setup_work_item_dir + wi_dir = repo_dir / "docs" / "work-items" / "ET-001" + wi_dir.mkdir(parents=True) + (wi_dir / "13-test-report.md").write_text("# Test Report\n\nResult: FAIL\n") + + passed, reason = check_tests_passed("enduro-trails", "ET-001") + assert passed is False + + def test_no_report(self, setup_work_item_dir): + passed, reason = check_tests_passed("enduro-trails", "ET-001") + assert passed is False + assert "not found" in reason.lower() diff --git a/tests/test_webhooks.py b/tests/test_webhooks.py index 99253d8..cc5d7d7 100644 --- a/tests/test_webhooks.py +++ b/tests/test_webhooks.py @@ -1,20 +1,26 @@ import pytest import os import tempfile +from unittest.mock import patch, MagicMock, AsyncMock # Override DB path before importing app _test_db = os.path.join(tempfile.gettempdir(), "test_orchestrator.db") os.environ["ORCH_DB_PATH"] = _test_db +os.environ["ORCH_REPOS_DIR"] = tempfile.gettempdir() +os.environ["ORCH_HOST_REPOS_DIR"] = "/home/slin/repos" +os.environ["ORCH_GITEA_TOKEN"] = "test-token" +os.environ["ORCH_PLANE_API_TOKEN"] = "test-token" +os.environ["ORCH_GITEA_OWNER"] = "admin" +os.environ["ORCH_DEFAULT_REPO"] = "enduro-trails" from fastapi.testclient import TestClient from src.main import app -from src.db import init_db +from src.db import init_db, get_db @pytest.fixture(autouse=True) def setup_db(): """Ensure DB tables exist before each test.""" - # Remove old test db if exists if os.path.exists(_test_db): os.unlink(_test_db) init_db() @@ -33,7 +39,16 @@ def test_health(): assert resp.json()["service"] == "orchestrator" -def test_plane_webhook_accepts(): +def test_status_endpoint(): + resp = client.get("/status") + assert resp.status_code == 200 + assert "active_tasks" in resp.json() + + +@patch("src.webhooks.plane._create_gitea_branch", new_callable=AsyncMock) +@patch("src.webhooks.plane._create_initial_docs", new_callable=AsyncMock) +def test_plane_webhook_creates_task(mock_docs, mock_branch): + """work_item.created → task in DB with stage=analysis.""" resp = client.post("/webhook/plane", json={ "event": "work_item.created", "data": {"id": "test-123", "name": "Test task", "project": "proj-1"} @@ -41,32 +56,208 @@ def test_plane_webhook_accepts(): assert resp.status_code == 200 assert resp.json()["status"] == "accepted" + # Verify task was created + conn = get_db() + task = conn.execute("SELECT * FROM tasks WHERE plane_id = 'test-123'").fetchone() + conn.close() + assert task is not None + assert task["stage"] == "analysis" + assert task["work_item_id"] is not None + assert "feature/" in task["branch"] -def test_plane_webhook_comment(): + +@patch("src.webhooks.plane._create_gitea_branch", new_callable=AsyncMock) +@patch("src.webhooks.plane._create_initial_docs", new_callable=AsyncMock) +def test_plane_webhook_generates_sequential_ids(mock_docs, mock_branch): + """Multiple work items get sequential IDs.""" + client.post("/webhook/plane", json={ + "event": "work_item.created", + "data": {"id": "item-1", "name": "First task", "project": "proj-1"} + }) + client.post("/webhook/plane", json={ + "event": "work_item.created", + "data": {"id": "item-2", "name": "Second task", "project": "proj-1"} + }) + + conn = get_db() + tasks = conn.execute("SELECT work_item_id FROM tasks ORDER BY id").fetchall() + conn.close() + ids = [t["work_item_id"] for t in tasks] + assert ids[0] == "ET-001" + assert ids[1] == "ET-002" + + +@patch("src.webhooks.plane._create_gitea_branch", new_callable=AsyncMock) +@patch("src.webhooks.plane._create_initial_docs", new_callable=AsyncMock) +@patch("src.webhooks.plane.launcher") +def test_plane_approved_advances_stage(mock_launcher, mock_docs, mock_branch, tmp_path, monkeypatch): + """Comment :approved: at stage=analysis → advance to architecture.""" + # Patch repos_dir for QG check + monkeypatch.setattr("src.qg.checks.settings.repos_dir", str(tmp_path)) + + # Create task first + client.post("/webhook/plane", json={ + "event": "work_item.created", + "data": {"id": "adv-001", "name": "Advance test", "project": "proj-1"} + }) + + # Get the task to find work_item_id + conn = get_db() + task = conn.execute("SELECT * FROM tasks WHERE plane_id = 'adv-001'").fetchone() + conn.close() + work_item_id = task["work_item_id"] + + # Create required analysis files + wi_dir = tmp_path / "enduro-trails" / "docs" / "work-items" / work_item_id + wi_dir.mkdir(parents=True) + (wi_dir / "01-brd.md").write_text("# BRD") + (wi_dir / "02-trz.md").write_text("# TRZ") + (wi_dir / "03-acceptance-criteria.md").write_text("# AC") + (wi_dir / "04-test-plan.yaml").write_text("tests: []") + + # Mock launcher + mock_launcher.launch.return_value = 1 + + # Send approved comment resp = client.post("/webhook/plane", json={ "event": "comment.created", - "data": {"comment": "LGTM :approved:"} + "data": { + "work_item_id": "adv-001", + "comment": "Looks good :approved:" + } }) assert resp.status_code == 200 - assert resp.json()["status"] == "accepted" + + # Verify stage advanced + conn = get_db() + task = conn.execute("SELECT * FROM tasks WHERE plane_id = 'adv-001'").fetchone() + conn.close() + assert task["stage"] == "architecture" + + +@patch("src.webhooks.plane._create_gitea_branch", new_callable=AsyncMock) +@patch("src.webhooks.plane._create_initial_docs", new_callable=AsyncMock) +def test_plane_rejected_rolls_back(mock_docs, mock_branch): + """Comment :rejected: rolls back stage.""" + # Create task + client.post("/webhook/plane", json={ + "event": "work_item.created", + "data": {"id": "rej-001", "name": "Reject test", "project": "proj-1"} + }) + + # Manually set stage to architecture + conn = get_db() + conn.execute("UPDATE tasks SET stage = 'architecture' WHERE plane_id = 'rej-001'") + conn.commit() + conn.close() + + # Send rejected comment + resp = client.post("/webhook/plane", json={ + "event": "comment.created", + "data": { + "work_item_id": "rej-001", + "comment": "Not ready :rejected:" + } + }) + assert resp.status_code == 200 + + # Verify stage rolled back + conn = get_db() + task = conn.execute("SELECT * FROM tasks WHERE plane_id = 'rej-001'").fetchone() + conn.close() + assert task["stage"] == "analysis" def test_gitea_webhook_push(): + """Push event is accepted.""" resp = client.post( "/webhook/gitea", - json={"ref": "refs/heads/feature/test", "repository": {"name": "enduro-trails"}}, + json={"ref": "refs/heads/feature/test", "repository": {"name": "enduro-trails"}, "commits": []}, headers={"X-Gitea-Event": "push"} ) assert resp.status_code == 200 assert resp.json()["status"] == "accepted" -def test_gitea_webhook_pr(): +@patch("src.webhooks.gitea.launcher") +def test_gitea_push_with_adr_advances_stage(mock_launcher): + """Push with ADR files at architecture stage → advance to development.""" + mock_launcher.launch.return_value = 1 + + # Create a task at architecture stage + conn = get_db() + conn.execute( + "INSERT INTO tasks (plane_id, work_item_id, repo, branch, stage) VALUES (?, ?, ?, ?, ?)", + ("push-001", "ET-010", "enduro-trails", "feature/ET-010-test", "architecture"), + ) + conn.commit() + conn.close() + + # Push with ADR file resp = client.post( "/webhook/gitea", json={ - "action": "reviewed", - "pull_request": {"state": "approved", "number": 1} + "ref": "refs/heads/feature/ET-010-test", + "repository": {"name": "enduro-trails"}, + "commits": [ + {"added": ["docs/work-items/ET-010/06-adr/001-decision.md"], "modified": []} + ], + }, + headers={"X-Gitea-Event": "push"} + ) + assert resp.status_code == 200 + + # Verify stage advanced + conn = get_db() + task = conn.execute("SELECT * FROM tasks WHERE plane_id = 'push-001'").fetchone() + conn.close() + assert task["stage"] == "development" + mock_launcher.launch.assert_called_once() + + +@patch("src.webhooks.gitea.check_ci_green") +@patch("src.webhooks.gitea.launcher") +def test_gitea_ci_success_advances_to_review(mock_launcher, mock_ci): + """CI success at development stage → advance to review.""" + mock_ci.return_value = (True, "CI green") + mock_launcher.launch.return_value = 2 + + # Create a task at development stage + conn = get_db() + conn.execute( + "INSERT INTO tasks (plane_id, work_item_id, repo, branch, stage) VALUES (?, ?, ?, ?, ?)", + ("ci-001", "ET-011", "enduro-trails", "feature/ET-011-test", "development"), + ) + conn.commit() + conn.close() + + # CI status success + resp = client.post( + "/webhook/gitea", + json={ + "state": "success", + "branches": [{"name": "feature/ET-011-test"}], + "repository": {"name": "enduro-trails"}, + }, + headers={"X-Gitea-Event": "status"} + ) + assert resp.status_code == 200 + + # Verify stage advanced + conn = get_db() + task = conn.execute("SELECT * FROM tasks WHERE plane_id = 'ci-001'").fetchone() + conn.close() + assert task["stage"] == "review" + + +def test_gitea_webhook_pr(): + """PR event is accepted.""" + resp = client.post( + "/webhook/gitea", + json={ + "action": "opened", + "pull_request": {"head": {"ref": "feature/test"}, "number": 1}, + "repository": {"name": "enduro-trails"}, }, headers={"X-Gitea-Event": "pull_request"} ) @@ -74,18 +265,17 @@ def test_gitea_webhook_pr(): assert resp.json()["status"] == "accepted" -def test_status_endpoint(): - resp = client.get("/status") - assert resp.status_code == 200 - assert "active_tasks" in resp.json() - - -def test_plane_webhook_creates_task(): - """Verify that work_item.created actually inserts a task.""" +def test_plane_webhook_event_logged(): + """Events are logged in the events table.""" client.post("/webhook/plane", json={ - "event": "work_item.created", - "data": {"id": "task-456", "name": "New feature", "project": "proj-2"} + "event": "test.event", + "data": {"foo": "bar"} }) - resp = client.get("/status") - tasks = resp.json()["active_tasks"] - assert any(t["plane_id"] == "task-456" for t in tasks) + + conn = get_db() + event = conn.execute( + "SELECT * FROM events WHERE event_type = 'test.event'" + ).fetchone() + conn.close() + assert event is not None + assert event["source"] == "plane"