import pytest import asyncio import os import tempfile from unittest.mock import patch, MagicMock, AsyncMock # Override DB path before importing app _test_db = os.path.join(tempfile.gettempdir(), "test_orchestrator.db") os.environ["ORCH_DB_PATH"] = _test_db os.environ["ORCH_PLANE_WEBHOOK_SECRET"] = "" os.environ["ORCH_GITEA_WEBHOOK_SECRET"] = "" os.environ["ORCH_REPOS_DIR"] = tempfile.gettempdir() os.environ["ORCH_HOST_REPOS_DIR"] = "/home/slin/repos" os.environ["ORCH_GITEA_TOKEN"] = "test-token" os.environ["ORCH_PLANE_API_TOKEN"] = "test-token" os.environ["ORCH_GITEA_OWNER"] = "admin" os.environ["ORCH_DEFAULT_REPO"] = "enduro-trails" # ORCH-6: register the test project so the project filter lets these fixtures # through. proj-1 maps to enduro-trails/ET, preserving the ET-001/ET-002 asserts. os.environ["ORCH_PROJECTS_JSON"] = ( '[{"plane_project_id": "proj-1", "repo": "enduro-trails", ' '"work_item_prefix": "ET", "name": "enduro-trails"}]' ) from fastapi.testclient import TestClient from src.main import app from src.db import init_db, get_db @pytest.fixture(autouse=True) def setup_db(): """Ensure DB tables exist before each test.""" if os.path.exists(_test_db): os.unlink(_test_db) init_db() yield if os.path.exists(_test_db): os.unlink(_test_db) client = TestClient(app) def test_health(): resp = client.get("/health") assert resp.status_code == 200 assert resp.json()["status"] == "ok" assert resp.json()["service"] == "orchestrator" def test_status_endpoint(): resp = client.get("/status") assert resp.status_code == 200 assert "active_tasks" in resp.json() @patch("src.plane_sync.add_comment") @patch("src.plane_sync.fetch_issue_sequence_id", return_value=None) @patch("src.plane_sync.fetch_issue_fields", return_value=("Test task", "This is a detailed test description for the task")) @patch("src.webhooks.plane._create_gitea_branch", new_callable=AsyncMock) @patch("src.webhooks.plane._create_initial_docs", new_callable=AsyncMock) def test_plane_webhook_creates_task(mock_docs, mock_branch, mock_fetch_fields, mock_fetch_seq, mock_add_comment): """work_item.created (via In Progress status) → task in DB with stage=analysis.""" resp = client.post("/webhook/plane", json={ "event": "issue", "action": "updated", "data": { "id": "test-123", "name": "Test task", "project": "proj-1", "state": {"id": "b873d9eb-993c-48cd-97ac-99a9b1623967", "name": "In Progress", "group": "started"}, } }) assert resp.status_code == 200 assert resp.json()["status"] == "accepted" # Verify task was created conn = get_db() task = conn.execute("SELECT * FROM tasks WHERE plane_id = 'test-123'").fetchone() conn.close() assert task is not None assert task["stage"] == "analysis" assert task["work_item_id"] is not None assert "feature/" in task["branch"] @patch("src.plane_sync.add_comment") @patch("src.plane_sync.fetch_issue_sequence_id", return_value=None) @patch("src.plane_sync.fetch_issue_fields", side_effect=[ ("First task", "This is a detailed description for the first task item"), ("Second task", "This is a detailed description for the second task item"), ]) @patch("src.webhooks.plane._create_gitea_branch", new_callable=AsyncMock) @patch("src.webhooks.plane._create_initial_docs", new_callable=AsyncMock) def test_plane_webhook_generates_sequential_ids( mock_docs, mock_branch, mock_fetch_fields, mock_fetch_seq, mock_add_comment ): """Multiple In Progress transitions get sequential IDs (ET-001, ET-002).""" in_progress_state = { "id": "b873d9eb-993c-48cd-97ac-99a9b1623967", "name": "In Progress", "group": "started", } client.post("/webhook/plane", json={ "event": "issue", "action": "updated", "data": { "id": "item-1", "name": "First task", "project": "proj-1", "state": in_progress_state, } }) client.post("/webhook/plane", json={ "event": "issue", "action": "updated", "data": { "id": "item-2", "name": "Second task", "project": "proj-1", "state": in_progress_state, } }) conn = get_db() tasks = conn.execute("SELECT work_item_id FROM tasks ORDER BY id").fetchall() conn.close() ids = [t["work_item_id"] for t in tasks] assert ids[0] == "ET-001" assert ids[1] == "ET-002" APPROVED_STATE = "a519a341-dada-4a91-8910-7604f82b79c5" REJECTED_STATE = "ba958f3c-5db5-461d-8f82-89425e413b97" @patch("src.webhooks.plane._create_gitea_branch", new_callable=AsyncMock) @patch("src.webhooks.plane._create_initial_docs", new_callable=AsyncMock) @patch("src.webhooks.plane.launcher") def test_plane_approved_advances_stage(mock_launcher, mock_docs, mock_branch, tmp_path, monkeypatch): """Status-only model: Approved STATUS at stage=analysis -> advance to architecture. A comment never triggers this. """ # Patch repos_dir for QG check monkeypatch.setattr("src.qg.checks.settings.repos_dir", str(tmp_path)) # Seed an analysis task directly (creation no longer makes a task post-PR#11). conn = get_db() conn.execute( "INSERT INTO tasks (plane_id, work_item_id, repo, branch, stage, plane_issue_id) " "VALUES (?, ?, ?, ?, ?, ?)", ("adv-001", "ET-001", "enduro-trails", "feature/ET-001-x", "analysis", "adv-001"), ) conn.commit() conn.close() work_item_id = "ET-001" # Create required analysis files so the analysis QG passes. wi_dir = tmp_path / "enduro-trails" / "docs" / "work-items" / work_item_id wi_dir.mkdir(parents=True) (wi_dir / "01-brd.md").write_text("# BRD") (wi_dir / "02-trz.md").write_text("# TRZ") (wi_dir / "03-acceptance-criteria.md").write_text("# AC") (wi_dir / "04-test-plan.yaml").write_text("tests: []") mock_launcher.launch.return_value = 1 # Send Approved STATUS change. resp = client.post("/webhook/plane", json={ "event": "issue", "action": "updated", "data": { "id": "adv-001", "name": "Advance test", "project": "proj-1", "state": {"id": APPROVED_STATE, "name": "Approved", "group": "completed"}, }, }) assert resp.status_code == 200 # Verify stage advanced conn = get_db() task = conn.execute("SELECT * FROM tasks WHERE plane_id = 'adv-001'").fetchone() conn.close() assert task["stage"] == "architecture" @patch("src.webhooks.plane.httpx.get") @patch("src.webhooks.plane._create_gitea_branch", new_callable=AsyncMock) @patch("src.webhooks.plane._create_initial_docs", new_callable=AsyncMock) def test_plane_rejected_rolls_back(mock_docs, mock_branch, mock_get): """Status-only model: Rejected STATUS rolls back stage. A comment never triggers this; the reason is pulled from the latest comment. """ class _R: status_code = 200 @staticmethod def json(): return {"results": [ {"comment_stripped": "missing ADR", "created_at": "2026-06-03T10:00:00Z"} ]} mock_get.return_value = _R() # Seed an architecture task directly. conn = get_db() conn.execute( "INSERT INTO tasks (plane_id, work_item_id, repo, branch, stage, plane_issue_id) " "VALUES (?, ?, ?, ?, ?, ?)", ("rej-001", "ET-002", "enduro-trails", "feature/ET-002-x", "architecture", "rej-001"), ) conn.commit() conn.close() # Send Rejected STATUS change. resp = client.post("/webhook/plane", json={ "event": "issue", "action": "updated", "data": { "id": "rej-001", "name": "Reject test", "project": "proj-1", "state": {"id": REJECTED_STATE, "name": "Rejected", "group": "cancelled"}, }, }) assert resp.status_code == 200 # Verify stage rolled back conn = get_db() task = conn.execute("SELECT * FROM tasks WHERE plane_id = 'rej-001'").fetchone() conn.close() assert task["stage"] == "analysis" def test_gitea_webhook_push(): """Push event is accepted.""" resp = client.post( "/webhook/gitea", json={"ref": "refs/heads/feature/test", "repository": {"name": "enduro-trails"}, "commits": []}, headers={"X-Gitea-Event": "push"} ) assert resp.status_code == 200 assert resp.json()["status"] == "accepted" @patch("src.webhooks.gitea.plane_notify_stage") @patch("src.webhooks.gitea.launcher") def test_gitea_push_with_adr_advances_stage(mock_launcher, mock_plane_notify): """Push with ADR files at architecture stage → advance to development.""" mock_launcher.launch.return_value = 1 # Create a task at architecture stage conn = get_db() conn.execute( "INSERT INTO tasks (plane_id, work_item_id, repo, branch, stage) VALUES (?, ?, ?, ?, ?)", ("push-001", "ET-010", "enduro-trails", "feature/ET-010-test", "architecture"), ) conn.commit() conn.close() # Push with ADR file resp = client.post( "/webhook/gitea", json={ "ref": "refs/heads/feature/ET-010-test", "repository": {"name": "enduro-trails"}, "commits": [ {"added": ["docs/work-items/ET-010/06-adr/001-decision.md"], "modified": []} ], }, headers={"X-Gitea-Event": "push"} ) assert resp.status_code == 200 # Verify stage advanced conn = get_db() task = conn.execute("SELECT * FROM tasks WHERE plane_id = 'push-001'").fetchone() conn.close() assert task["stage"] == "development" mock_plane_notify.assert_called_once() @patch("src.webhooks.gitea.check_ci_green") @patch("src.webhooks.gitea.launcher") def test_gitea_ci_success_advances_to_review(mock_launcher, mock_ci): """CI success at development stage → advance to review.""" mock_ci.return_value = (True, "CI green") mock_launcher.launch.return_value = 2 # Create a task at development stage conn = get_db() conn.execute( "INSERT INTO tasks (plane_id, work_item_id, repo, branch, stage) VALUES (?, ?, ?, ?, ?)", ("ci-001", "ET-011", "enduro-trails", "feature/ET-011-test", "development"), ) conn.commit() conn.close() # CI status success resp = client.post( "/webhook/gitea", json={ "state": "success", "branches": [{"name": "feature/ET-011-test"}], "repository": {"name": "enduro-trails"}, }, headers={"X-Gitea-Event": "status"} ) assert resp.status_code == 200 # Verify stage advanced conn = get_db() task = conn.execute("SELECT * FROM tasks WHERE plane_id = 'ci-001'").fetchone() conn.close() assert task["stage"] == "review" @patch("src.webhooks.gitea.notify_qg_failure") @patch("src.webhooks.gitea.launcher") def test_gitea_ci_failure_on_development_notifies_qg_failure(mock_launcher, mock_notify): """BUG 6: CI failure at development is now the authoritative QG gate failing. It must notify QG failure (not silently suppress) and must NOT advance the stage. """ conn = get_db() conn.execute( "INSERT INTO tasks (plane_id, work_item_id, repo, branch, stage) VALUES (?, ?, ?, ?, ?)", ("ci-fail-001", "ET-011", "enduro-trails", "feature/ET-011-test", "development"), ) conn.commit() conn.close() resp = client.post( "/webhook/gitea", json={ "state": "failure", "branches": [{"name": "feature/ET-011-test"}], "repository": {"name": "enduro-trails"}, }, headers={"X-Gitea-Event": "status"}, ) assert resp.status_code == 200 # QG failure was reported for the development stage with check_ci_green. assert mock_notify.called args, kwargs = mock_notify.call_args call = list(args) + list(kwargs.values()) assert "development" in call assert "check_ci_green" in call # Stage did NOT advance. conn = get_db() task = conn.execute("SELECT * FROM tasks WHERE plane_id = 'ci-fail-001'").fetchone() conn.close() assert task["stage"] == "development" def test_gitea_webhook_pr(): """PR event is accepted.""" resp = client.post( "/webhook/gitea", json={ "action": "opened", "pull_request": {"head": {"ref": "feature/test"}, "number": 1}, "repository": {"name": "enduro-trails"}, }, headers={"X-Gitea-Event": "pull_request"} ) assert resp.status_code == 200 assert resp.json()["status"] == "accepted" def test_plane_webhook_event_logged(): """Events are logged in the events table.""" client.post("/webhook/plane", json={ "event": "test.event", "data": {"foo": "bar"} }) conn = get_db() event = conn.execute( "SELECT * FROM events WHERE event_type = 'test.event'" ).fetchone() conn.close() assert event is not None assert event["source"] == "plane" # --------------------------------------------------------------------------- # BUG 7: red CI on development must bounce the task back to the developer # (capped retries, symmetric to review REQUEST_CHANGES). These are pure-logic # tests: they invoke handle_ci_status() directly with mocked helpers so they do # not pass through the TestClient HMAC barrier (baseline 401s are off-limits). # --------------------------------------------------------------------------- def _ci_failure_payload(): return { "state": "failure", "branches": [{"name": "feature/ET-011-test"}], "repository": {"name": "enduro-trails"}, } def _mock_db_with_retry_count(count): """Build a get_db() mock whose retry_count query returns `count`.""" conn = MagicMock() conn.execute.return_value.fetchone.return_value = {"cnt": count} return conn @patch("src.webhooks.gitea.notify_error") @patch("src.webhooks.gitea.notify_qg_failure") @patch("src.webhooks.gitea.enqueue_job") @patch("src.webhooks.gitea.update_task_stage") @patch("src.webhooks.gitea.get_db") @patch("src.webhooks.gitea.get_task_by_repo_branch") @patch("src.webhooks.gitea.get_project_by_repo") def test_ci_failure_development_retries_developer_under_limit( mock_proj, mock_task, mock_get_db, mock_update_stage, mock_enqueue, mock_qg, mock_err, ): """retry_count < MAX_DEV_RETRIES → relaunch developer, stage untouched.""" from src.webhooks.gitea import handle_ci_status mock_proj.return_value = {"repo": "enduro-trails"} mock_task.return_value = { "id": 1, "stage": "development", "work_item_id": "ET-011", } mock_get_db.return_value = _mock_db_with_retry_count(0) mock_enqueue.return_value = 42 asyncio.run(handle_ci_status(_ci_failure_payload())) # QG failure was still reported (Slava sees both the failure and the retry). assert mock_qg.called # developer was re-enqueued. assert mock_enqueue.called assert mock_enqueue.call_args[0][0] == "developer" # No escalation. assert not mock_err.called # Stage stays on development — no update_task_stage in the CI-failure path. assert not mock_update_stage.called @patch("src.webhooks.gitea.notify_error") @patch("src.webhooks.gitea.notify_qg_failure") @patch("src.webhooks.gitea.enqueue_job") @patch("src.webhooks.gitea.update_task_stage") @patch("src.webhooks.gitea.get_db") @patch("src.webhooks.gitea.get_task_by_repo_branch") @patch("src.webhooks.gitea.get_project_by_repo") def test_ci_failure_development_escalates_at_limit( mock_proj, mock_task, mock_get_db, mock_update_stage, mock_enqueue, mock_qg, mock_err, ): """retry_count >= MAX_DEV_RETRIES → escalate via notify_error, no relaunch.""" from src.webhooks.gitea import handle_ci_status, MAX_DEV_RETRIES mock_proj.return_value = {"repo": "enduro-trails"} mock_task.return_value = { "id": 1, "stage": "development", "work_item_id": "ET-011", } mock_get_db.return_value = _mock_db_with_retry_count(MAX_DEV_RETRIES) asyncio.run(handle_ci_status(_ci_failure_payload())) # QG failure still reported. assert mock_qg.called # developer NOT re-enqueued at the cap. assert not mock_enqueue.called # Escalation message mentions CI failure. assert mock_err.called err_msg = " ".join(str(a) for a in mock_err.call_args[0]) assert "Max developer retries" in err_msg assert "after CI failure" in err_msg # Stage untouched. assert not mock_update_stage.called # --------------------------------------------------------------------------- # BUG 8 (second door): a merged-PR webhook must NOT fake-complete a task that is # still in the deploy stage. On `deploy` done is gated by the deployer's verdict # (check_deploy_status via advance_stage), not by the merge event. For every # other stage the merge->done behaviour is preserved. Pure-logic tests: invoke # handle_pr() directly with mocked helpers (no HMAC barrier). # --------------------------------------------------------------------------- def _merged_pr_payload(branch="feature/ET-012-x"): return { "action": "closed", "pull_request": { "merged": True, "number": 7, "head": {"ref": branch}, }, "repository": {"name": "enduro-trails"}, } @patch("src.webhooks.gitea.notify_stage_change") @patch("src.webhooks.gitea.update_task_stage") @patch("src.webhooks.gitea.get_task_by_repo_branch") @patch("src.webhooks.gitea.get_project_by_repo") def test_merge_on_deploy_stage_does_not_set_done( mock_proj, mock_task, mock_update_stage, mock_notify, ): """FIX 1: merge at deploy stage is ignored — done is gated by deployer verdict.""" from src.webhooks.gitea import handle_pr mock_proj.return_value = {"repo": "enduro-trails"} mock_task.return_value = { "id": 1, "stage": "deploy", "work_item_id": "ET-012", } asyncio.run(handle_pr(_merged_pr_payload())) # The merge-driven done path must NOT run on deploy. assert not mock_update_stage.called assert not mock_notify.called @patch("src.webhooks.gitea.notify_stage_change") @patch("src.webhooks.gitea.update_task_stage") @patch("src.webhooks.gitea.get_task_by_repo_branch") @patch("src.webhooks.gitea.get_project_by_repo") def test_merge_on_non_deploy_stage_sets_done( mock_proj, mock_task, mock_update_stage, mock_notify, ): """FIX 1: merge behaviour is preserved for non-deploy stages (e.g. review).""" from src.webhooks.gitea import handle_pr mock_proj.return_value = {"repo": "enduro-trails"} mock_task.return_value = { "id": 2, "stage": "review", "work_item_id": "ET-013", } asyncio.run(handle_pr(_merged_pr_payload(branch="feature/ET-013-x"))) # Non-deploy stages still get the merge-driven done. mock_update_stage.assert_called_once_with(2, "done") assert mock_notify.called