Files
orchestrator/tests/test_webhooks.py
Dev Agent b545665e2d feat: full pipeline fixes - CI status branch lookup, review webhook routing, auto-advance, plane sync
- handle_ci_status: fallback git branch -r --contains when branches[] empty
- webhook router: handle pull_request_approved event type
- handle_pr: map review.type to review.state for new Gitea format
- launcher: auto-advance stage after agent completion (_try_advance_stage)
- plane_sync: notify Plane on stage changes
- stages.py: stage machine with QG definitions
- notifications.py: stage change notifications
- safe.directory fix for container git operations
2026-05-22 01:57:02 +03:00

282 lines
9.0 KiB
Python

import pytest
import os
import tempfile
from unittest.mock import patch, MagicMock, AsyncMock
# Override DB path before importing app
_test_db = os.path.join(tempfile.gettempdir(), "test_orchestrator.db")
os.environ["ORCH_DB_PATH"] = _test_db
os.environ["ORCH_REPOS_DIR"] = tempfile.gettempdir()
os.environ["ORCH_HOST_REPOS_DIR"] = "/home/slin/repos"
os.environ["ORCH_GITEA_TOKEN"] = "test-token"
os.environ["ORCH_PLANE_API_TOKEN"] = "test-token"
os.environ["ORCH_GITEA_OWNER"] = "admin"
os.environ["ORCH_DEFAULT_REPO"] = "enduro-trails"
from fastapi.testclient import TestClient
from src.main import app
from src.db import init_db, get_db
@pytest.fixture(autouse=True)
def setup_db():
"""Ensure DB tables exist before each test."""
if os.path.exists(_test_db):
os.unlink(_test_db)
init_db()
yield
if os.path.exists(_test_db):
os.unlink(_test_db)
client = TestClient(app)
def test_health():
resp = client.get("/health")
assert resp.status_code == 200
assert resp.json()["status"] == "ok"
assert resp.json()["service"] == "orchestrator"
def test_status_endpoint():
resp = client.get("/status")
assert resp.status_code == 200
assert "active_tasks" in resp.json()
@patch("src.webhooks.plane._create_gitea_branch", new_callable=AsyncMock)
@patch("src.webhooks.plane._create_initial_docs", new_callable=AsyncMock)
def test_plane_webhook_creates_task(mock_docs, mock_branch):
"""work_item.created → task in DB with stage=analysis."""
resp = client.post("/webhook/plane", json={
"event": "work_item.created",
"data": {"id": "test-123", "name": "Test task", "project": "proj-1"}
})
assert resp.status_code == 200
assert resp.json()["status"] == "accepted"
# Verify task was created
conn = get_db()
task = conn.execute("SELECT * FROM tasks WHERE plane_id = 'test-123'").fetchone()
conn.close()
assert task is not None
assert task["stage"] == "analysis"
assert task["work_item_id"] is not None
assert "feature/" in task["branch"]
@patch("src.webhooks.plane._create_gitea_branch", new_callable=AsyncMock)
@patch("src.webhooks.plane._create_initial_docs", new_callable=AsyncMock)
def test_plane_webhook_generates_sequential_ids(mock_docs, mock_branch):
"""Multiple work items get sequential IDs."""
client.post("/webhook/plane", json={
"event": "work_item.created",
"data": {"id": "item-1", "name": "First task", "project": "proj-1"}
})
client.post("/webhook/plane", json={
"event": "work_item.created",
"data": {"id": "item-2", "name": "Second task", "project": "proj-1"}
})
conn = get_db()
tasks = conn.execute("SELECT work_item_id FROM tasks ORDER BY id").fetchall()
conn.close()
ids = [t["work_item_id"] for t in tasks]
assert ids[0] == "ET-001"
assert ids[1] == "ET-002"
@patch("src.webhooks.plane._create_gitea_branch", new_callable=AsyncMock)
@patch("src.webhooks.plane._create_initial_docs", new_callable=AsyncMock)
@patch("src.webhooks.plane.launcher")
def test_plane_approved_advances_stage(mock_launcher, mock_docs, mock_branch, tmp_path, monkeypatch):
"""Comment :approved: at stage=analysis → advance to architecture."""
# Patch repos_dir for QG check
monkeypatch.setattr("src.qg.checks.settings.repos_dir", str(tmp_path))
# Create task first
client.post("/webhook/plane", json={
"event": "work_item.created",
"data": {"id": "adv-001", "name": "Advance test", "project": "proj-1"}
})
# Get the task to find work_item_id
conn = get_db()
task = conn.execute("SELECT * FROM tasks WHERE plane_id = 'adv-001'").fetchone()
conn.close()
work_item_id = task["work_item_id"]
# Create required analysis files
wi_dir = tmp_path / "enduro-trails" / "docs" / "work-items" / work_item_id
wi_dir.mkdir(parents=True)
(wi_dir / "01-brd.md").write_text("# BRD")
(wi_dir / "02-trz.md").write_text("# TRZ")
(wi_dir / "03-acceptance-criteria.md").write_text("# AC")
(wi_dir / "04-test-plan.yaml").write_text("tests: []")
# Mock launcher
mock_launcher.launch.return_value = 1
# Send approved comment
resp = client.post("/webhook/plane", json={
"event": "comment.created",
"data": {
"work_item_id": "adv-001",
"comment": "Looks good :approved:"
}
})
assert resp.status_code == 200
# Verify stage advanced
conn = get_db()
task = conn.execute("SELECT * FROM tasks WHERE plane_id = 'adv-001'").fetchone()
conn.close()
assert task["stage"] == "architecture"
@patch("src.webhooks.plane._create_gitea_branch", new_callable=AsyncMock)
@patch("src.webhooks.plane._create_initial_docs", new_callable=AsyncMock)
def test_plane_rejected_rolls_back(mock_docs, mock_branch):
"""Comment :rejected: rolls back stage."""
# Create task
client.post("/webhook/plane", json={
"event": "work_item.created",
"data": {"id": "rej-001", "name": "Reject test", "project": "proj-1"}
})
# Manually set stage to architecture
conn = get_db()
conn.execute("UPDATE tasks SET stage = 'architecture' WHERE plane_id = 'rej-001'")
conn.commit()
conn.close()
# Send rejected comment
resp = client.post("/webhook/plane", json={
"event": "comment.created",
"data": {
"work_item_id": "rej-001",
"comment": "Not ready :rejected:"
}
})
assert resp.status_code == 200
# Verify stage rolled back
conn = get_db()
task = conn.execute("SELECT * FROM tasks WHERE plane_id = 'rej-001'").fetchone()
conn.close()
assert task["stage"] == "analysis"
def test_gitea_webhook_push():
"""Push event is accepted."""
resp = client.post(
"/webhook/gitea",
json={"ref": "refs/heads/feature/test", "repository": {"name": "enduro-trails"}, "commits": []},
headers={"X-Gitea-Event": "push"}
)
assert resp.status_code == 200
assert resp.json()["status"] == "accepted"
@patch("src.webhooks.gitea.launcher")
def test_gitea_push_with_adr_advances_stage(mock_launcher):
"""Push with ADR files at architecture stage → advance to development."""
mock_launcher.launch.return_value = 1
# Create a task at architecture stage
conn = get_db()
conn.execute(
"INSERT INTO tasks (plane_id, work_item_id, repo, branch, stage) VALUES (?, ?, ?, ?, ?)",
("push-001", "ET-010", "enduro-trails", "feature/ET-010-test", "architecture"),
)
conn.commit()
conn.close()
# Push with ADR file
resp = client.post(
"/webhook/gitea",
json={
"ref": "refs/heads/feature/ET-010-test",
"repository": {"name": "enduro-trails"},
"commits": [
{"added": ["docs/work-items/ET-010/06-adr/001-decision.md"], "modified": []}
],
},
headers={"X-Gitea-Event": "push"}
)
assert resp.status_code == 200
# Verify stage advanced
conn = get_db()
task = conn.execute("SELECT * FROM tasks WHERE plane_id = 'push-001'").fetchone()
conn.close()
assert task["stage"] == "development"
mock_launcher.launch.assert_called_once()
@patch("src.webhooks.gitea.check_ci_green")
@patch("src.webhooks.gitea.launcher")
def test_gitea_ci_success_advances_to_review(mock_launcher, mock_ci):
"""CI success at development stage → advance to review."""
mock_ci.return_value = (True, "CI green")
mock_launcher.launch.return_value = 2
# Create a task at development stage
conn = get_db()
conn.execute(
"INSERT INTO tasks (plane_id, work_item_id, repo, branch, stage) VALUES (?, ?, ?, ?, ?)",
("ci-001", "ET-011", "enduro-trails", "feature/ET-011-test", "development"),
)
conn.commit()
conn.close()
# CI status success
resp = client.post(
"/webhook/gitea",
json={
"state": "success",
"branches": [{"name": "feature/ET-011-test"}],
"repository": {"name": "enduro-trails"},
},
headers={"X-Gitea-Event": "status"}
)
assert resp.status_code == 200
# Verify stage advanced
conn = get_db()
task = conn.execute("SELECT * FROM tasks WHERE plane_id = 'ci-001'").fetchone()
conn.close()
assert task["stage"] == "review"
def test_gitea_webhook_pr():
"""PR event is accepted."""
resp = client.post(
"/webhook/gitea",
json={
"action": "opened",
"pull_request": {"head": {"ref": "feature/test"}, "number": 1},
"repository": {"name": "enduro-trails"},
},
headers={"X-Gitea-Event": "pull_request"}
)
assert resp.status_code == 200
assert resp.json()["status"] == "accepted"
def test_plane_webhook_event_logged():
"""Events are logged in the events table."""
client.post("/webhook/plane", json={
"event": "test.event",
"data": {"foo": "bar"}
})
conn = get_db()
event = conn.execute(
"SELECT * FROM events WHERE event_type = 'test.event'"
).fetchone()
conn.close()
assert event is not None
assert event["source"] == "plane"