import pytest import os import tempfile from unittest.mock import patch, MagicMock, AsyncMock # Override DB path before importing app _test_db = os.path.join(tempfile.gettempdir(), "test_orchestrator.db") os.environ["ORCH_DB_PATH"] = _test_db os.environ["ORCH_PLANE_WEBHOOK_SECRET"] = "" os.environ["ORCH_GITEA_WEBHOOK_SECRET"] = "" os.environ["ORCH_REPOS_DIR"] = tempfile.gettempdir() os.environ["ORCH_HOST_REPOS_DIR"] = "/home/slin/repos" os.environ["ORCH_GITEA_TOKEN"] = "test-token" os.environ["ORCH_PLANE_API_TOKEN"] = "test-token" os.environ["ORCH_GITEA_OWNER"] = "admin" os.environ["ORCH_DEFAULT_REPO"] = "enduro-trails" # ORCH-6: register the test project so the project filter lets these fixtures # through. proj-1 maps to enduro-trails/ET, preserving the ET-001/ET-002 asserts. os.environ["ORCH_PROJECTS_JSON"] = ( '[{"plane_project_id": "proj-1", "repo": "enduro-trails", ' '"work_item_prefix": "ET", "name": "enduro-trails"}]' ) from fastapi.testclient import TestClient from src.main import app from src.db import init_db, get_db @pytest.fixture(autouse=True) def setup_db(): """Ensure DB tables exist before each test.""" if os.path.exists(_test_db): os.unlink(_test_db) init_db() yield if os.path.exists(_test_db): os.unlink(_test_db) client = TestClient(app) def test_health(): resp = client.get("/health") assert resp.status_code == 200 assert resp.json()["status"] == "ok" assert resp.json()["service"] == "orchestrator" def test_status_endpoint(): resp = client.get("/status") assert resp.status_code == 200 assert "active_tasks" in resp.json() @patch("src.webhooks.plane._create_gitea_branch", new_callable=AsyncMock) @patch("src.webhooks.plane._create_initial_docs", new_callable=AsyncMock) def test_plane_webhook_creates_task(mock_docs, mock_branch): """work_item.created → task in DB with stage=analysis.""" resp = client.post("/webhook/plane", json={ "event": "work_item.created", "data": {"id": "test-123", "name": "Test task", "project": "proj-1"} }) assert resp.status_code == 200 assert resp.json()["status"] == "accepted" # Verify task was created conn = get_db() task = conn.execute("SELECT * FROM tasks WHERE plane_id = 'test-123'").fetchone() conn.close() assert task is not None assert task["stage"] == "analysis" assert task["work_item_id"] is not None assert "feature/" in task["branch"] @patch("src.webhooks.plane._create_gitea_branch", new_callable=AsyncMock) @patch("src.webhooks.plane._create_initial_docs", new_callable=AsyncMock) def test_plane_webhook_generates_sequential_ids(mock_docs, mock_branch): """Multiple work items get sequential IDs.""" client.post("/webhook/plane", json={ "event": "work_item.created", "data": {"id": "item-1", "name": "First task", "project": "proj-1"} }) client.post("/webhook/plane", json={ "event": "work_item.created", "data": {"id": "item-2", "name": "Second task", "project": "proj-1"} }) conn = get_db() tasks = conn.execute("SELECT work_item_id FROM tasks ORDER BY id").fetchall() conn.close() ids = [t["work_item_id"] for t in tasks] assert ids[0] == "ET-001" assert ids[1] == "ET-002" @patch("src.webhooks.plane._create_gitea_branch", new_callable=AsyncMock) @patch("src.webhooks.plane._create_initial_docs", new_callable=AsyncMock) @patch("src.webhooks.plane.launcher") def test_plane_approved_advances_stage(mock_launcher, mock_docs, mock_branch, tmp_path, monkeypatch): """Comment :approved: at stage=analysis → advance to architecture.""" # Patch repos_dir for QG check monkeypatch.setattr("src.qg.checks.settings.repos_dir", str(tmp_path)) # Create task first client.post("/webhook/plane", json={ "event": "work_item.created", "data": {"id": "adv-001", "name": "Advance test", "project": "proj-1"} }) # Get the task to find work_item_id conn = get_db() task = conn.execute("SELECT * FROM tasks WHERE plane_id = 'adv-001'").fetchone() conn.close() work_item_id = task["work_item_id"] # Create required analysis files wi_dir = tmp_path / "enduro-trails" / "docs" / "work-items" / work_item_id wi_dir.mkdir(parents=True) (wi_dir / "01-brd.md").write_text("# BRD") (wi_dir / "02-trz.md").write_text("# TRZ") (wi_dir / "03-acceptance-criteria.md").write_text("# AC") (wi_dir / "04-test-plan.yaml").write_text("tests: []") # Mock launcher mock_launcher.launch.return_value = 1 # Send approved comment resp = client.post("/webhook/plane", json={ "event": "comment.created", "data": { "work_item_id": "adv-001", "comment": "Looks good :approved:" } }) assert resp.status_code == 200 # Verify stage advanced conn = get_db() task = conn.execute("SELECT * FROM tasks WHERE plane_id = 'adv-001'").fetchone() conn.close() assert task["stage"] == "architecture" @patch("src.webhooks.plane._create_gitea_branch", new_callable=AsyncMock) @patch("src.webhooks.plane._create_initial_docs", new_callable=AsyncMock) def test_plane_rejected_rolls_back(mock_docs, mock_branch): """Comment :rejected: rolls back stage.""" # Create task client.post("/webhook/plane", json={ "event": "work_item.created", "data": {"id": "rej-001", "name": "Reject test", "project": "proj-1"} }) # Manually set stage to architecture conn = get_db() conn.execute("UPDATE tasks SET stage = 'architecture' WHERE plane_id = 'rej-001'") conn.commit() conn.close() # Send rejected comment resp = client.post("/webhook/plane", json={ "event": "comment.created", "data": { "work_item_id": "rej-001", "comment": "Not ready :rejected:" } }) assert resp.status_code == 200 # Verify stage rolled back conn = get_db() task = conn.execute("SELECT * FROM tasks WHERE plane_id = 'rej-001'").fetchone() conn.close() assert task["stage"] == "analysis" def test_gitea_webhook_push(): """Push event is accepted.""" resp = client.post( "/webhook/gitea", json={"ref": "refs/heads/feature/test", "repository": {"name": "enduro-trails"}, "commits": []}, headers={"X-Gitea-Event": "push"} ) assert resp.status_code == 200 assert resp.json()["status"] == "accepted" @patch("src.webhooks.gitea.launcher") def test_gitea_push_with_adr_advances_stage(mock_launcher): """Push with ADR files at architecture stage → advance to development.""" mock_launcher.launch.return_value = 1 # Create a task at architecture stage conn = get_db() conn.execute( "INSERT INTO tasks (plane_id, work_item_id, repo, branch, stage) VALUES (?, ?, ?, ?, ?)", ("push-001", "ET-010", "enduro-trails", "feature/ET-010-test", "architecture"), ) conn.commit() conn.close() # Push with ADR file resp = client.post( "/webhook/gitea", json={ "ref": "refs/heads/feature/ET-010-test", "repository": {"name": "enduro-trails"}, "commits": [ {"added": ["docs/work-items/ET-010/06-adr/001-decision.md"], "modified": []} ], }, headers={"X-Gitea-Event": "push"} ) assert resp.status_code == 200 # Verify stage advanced conn = get_db() task = conn.execute("SELECT * FROM tasks WHERE plane_id = 'push-001'").fetchone() conn.close() assert task["stage"] == "development" mock_launcher.launch.assert_called_once() @patch("src.webhooks.gitea.check_ci_green") @patch("src.webhooks.gitea.launcher") def test_gitea_ci_success_advances_to_review(mock_launcher, mock_ci): """CI success at development stage → advance to review.""" mock_ci.return_value = (True, "CI green") mock_launcher.launch.return_value = 2 # Create a task at development stage conn = get_db() conn.execute( "INSERT INTO tasks (plane_id, work_item_id, repo, branch, stage) VALUES (?, ?, ?, ?, ?)", ("ci-001", "ET-011", "enduro-trails", "feature/ET-011-test", "development"), ) conn.commit() conn.close() # CI status success resp = client.post( "/webhook/gitea", json={ "state": "success", "branches": [{"name": "feature/ET-011-test"}], "repository": {"name": "enduro-trails"}, }, headers={"X-Gitea-Event": "status"} ) assert resp.status_code == 200 # Verify stage advanced conn = get_db() task = conn.execute("SELECT * FROM tasks WHERE plane_id = 'ci-001'").fetchone() conn.close() assert task["stage"] == "review" def test_gitea_webhook_pr(): """PR event is accepted.""" resp = client.post( "/webhook/gitea", json={ "action": "opened", "pull_request": {"head": {"ref": "feature/test"}, "number": 1}, "repository": {"name": "enduro-trails"}, }, headers={"X-Gitea-Event": "pull_request"} ) assert resp.status_code == 200 assert resp.json()["status"] == "accepted" def test_plane_webhook_event_logged(): """Events are logged in the events table.""" client.post("/webhook/plane", json={ "event": "test.event", "data": {"foo": "bar"} }) conn = get_db() event = conn.execute( "SELECT * FROM events WHERE event_type = 'test.event'" ).fetchone() conn.close() assert event is not None assert event["source"] == "plane"