"""Feature 1: pipeline starts on status -> In Progress, not on creation. * work_item.created / issue created -> NO task, NO branch, NO analyst. * issue updated -> In Progress (from backlog) -> task created + analyst enqueued. * a second In Progress update for the same issue -> NO duplicate, NO restart (protects handle_comment, which also flips issues to In Progress). launcher / Gitea network are mocked. Real FastAPI endpoint via TestClient. """ import os import tempfile _test_db = os.path.join(tempfile.gettempdir(), "test_orchestrator_status_trigger.db") os.environ["ORCH_DB_PATH"] = _test_db os.environ.setdefault("ORCH_PLANE_WEBHOOK_SECRET", "") os.environ.setdefault("ORCH_GITEA_TOKEN", "test-token") os.environ.setdefault("ORCH_PLANE_API_TOKEN", "test-token") import pytest # noqa: E402 from unittest.mock import patch, AsyncMock # noqa: E402 from fastapi.testclient import TestClient # noqa: E402 from src.main import app # noqa: E402 from src.db import init_db, get_db # noqa: E402 from src import projects as P # noqa: E402 from src.projects import reload_projects # noqa: E402 ENDURO_PLANE_ID = "7a79f0a9-5278-49cd-9007-9a338f238f9c" IN_PROGRESS = "b873d9eb-993c-48cd-97ac-99a9b1623967" BACKLOG = "113b24f6-cce8-4be9-9a22-a359b9cf0122" client = TestClient(app) @pytest.fixture(autouse=True) def setup(monkeypatch): monkeypatch.setattr(P.settings, "db_path", _test_db) import src.db as _db monkeypatch.setattr(_db.settings, "db_path", _test_db) if os.path.exists(_test_db): os.unlink(_test_db) init_db() monkeypatch.setattr("src.webhooks.plane.verify_plane_signature", lambda body, sig: True) registry_json = ( f'[{{"plane_project_id": "{ENDURO_PLANE_ID}", "repo": "enduro-trails",' f' "work_item_prefix": "ET", "name": "enduro-trails"}}]' ) monkeypatch.setattr(P.settings, "projects_json", registry_json) reload_projects() yield reload_projects() if os.path.exists(_test_db): os.unlink(_test_db) def _created(plane_id="st-created"): return client.post("/webhook/plane", json={ "event": "issue", "action": "created", "data": { "id": plane_id, "name": "A valid backlog item title", "description_stripped": "A sufficiently long description for QG-0.", "project": ENDURO_PLANE_ID, "state": {"id": BACKLOG, "name": "Backlog", "group": "backlog"}, }, }) def _to_in_progress(plane_id="st-1"): return client.post("/webhook/plane", json={ "event": "issue", "action": "updated", "data": { "id": plane_id, "name": "A valid backlog item title", "description_stripped": "A sufficiently long description for QG-0.", "project": ENDURO_PLANE_ID, "state": {"id": IN_PROGRESS, "name": "In Progress", "group": "started"}, }, "activity": {"field": "state", "new_value": IN_PROGRESS, "old_value": BACKLOG}, }) def _count(plane_id): conn = get_db() n = conn.execute("SELECT COUNT(*) FROM tasks WHERE plane_id=?", (plane_id,)).fetchone()[0] conn.close() return n # --------------------------------------------------------------------------- # @patch("src.webhooks.plane.enqueue_job") @patch("src.webhooks.plane._create_initial_docs", new_callable=AsyncMock) @patch("src.webhooks.plane._create_gitea_branch", new_callable=AsyncMock) def test_created_does_not_start_pipeline(mock_branch, mock_docs, mock_enqueue): resp = _created("st-created") assert resp.status_code == 200 assert resp.json()["status"] == "accepted" # No task, no branch, no analyst enqueue. assert _count("st-created") == 0 mock_branch.assert_not_called() mock_enqueue.assert_not_called() @patch("src.webhooks.plane.enqueue_job") @patch("src.webhooks.plane._create_initial_docs", new_callable=AsyncMock) @patch("src.webhooks.plane._create_gitea_branch", new_callable=AsyncMock) @patch("src.plane_sync.fetch_issue_sequence_id", return_value=5) def test_in_progress_starts_pipeline(mock_seq, mock_branch, mock_docs, mock_enqueue): mock_enqueue.return_value = 1 resp = _to_in_progress("st-1") assert resp.status_code == 200 assert resp.json()["status"] == "accepted" assert _count("st-1") == 1 conn = get_db() task = conn.execute("SELECT * FROM tasks WHERE plane_id='st-1'").fetchone() conn.close() assert task["stage"] == "analysis" assert task["repo"] == "enduro-trails" mock_branch.assert_called_once() # analyst enqueued exactly once assert mock_enqueue.call_count == 1 assert mock_enqueue.call_args.args[0] == "analyst" @patch("src.webhooks.plane.enqueue_job") @patch("src.webhooks.plane._create_initial_docs", new_callable=AsyncMock) @patch("src.webhooks.plane._create_gitea_branch", new_callable=AsyncMock) @patch("src.plane_sync.fetch_issue_sequence_id", return_value=5) def test_repeat_in_progress_is_idempotent(mock_seq, mock_branch, mock_docs, mock_enqueue): mock_enqueue.return_value = 1 _to_in_progress("st-2") assert _count("st-2") == 1 assert mock_enqueue.call_count == 1 # Second In Progress update (e.g. handle_comment re-set the status). Use a # DISTINCT body (different activity old_value) so webhook dedup does NOT # short-circuit it — this exercises the existing-task idempotency guard in # handle_status_start, not the delivery-dedup layer. resp = client.post("/webhook/plane", json={ "event": "issue", "action": "updated", "data": { "id": "st-2", "name": "A valid backlog item title", "description_stripped": "A sufficiently long description for QG-0.", "project": ENDURO_PLANE_ID, "state": {"id": IN_PROGRESS, "name": "In Progress", "group": "started"}, }, "activity": {"field": "state", "new_value": IN_PROGRESS, "old_value": "some-other-state"}, }) assert resp.status_code == 200 assert _count("st-2") == 1 # still exactly one task assert mock_enqueue.call_count == 1 # analyst NOT re-enqueued