"""Feature 1: pipeline starts on status -> In Progress, not on creation. * work_item.created / issue created -> NO task, NO branch, NO analyst. * issue updated -> In Progress (from backlog) -> task created + analyst enqueued. * a second In Progress update while the agent is busy -> NO duplicate, NO restart (busy-guard). * In Progress returned from Needs Input (agent idle) -> agent RELAUNCHED. launcher / Gitea network are mocked. Real FastAPI endpoint via TestClient. """ import os import tempfile _test_db = os.path.join(tempfile.gettempdir(), "test_orchestrator_status_trigger.db") os.environ["ORCH_DB_PATH"] = _test_db os.environ.setdefault("ORCH_PLANE_WEBHOOK_SECRET", "") os.environ.setdefault("ORCH_GITEA_TOKEN", "test-token") os.environ.setdefault("ORCH_PLANE_API_TOKEN", "test-token") import pytest # noqa: E402 from unittest.mock import patch, AsyncMock # noqa: E402 from fastapi.testclient import TestClient # noqa: E402 from src.main import app # noqa: E402 from src.db import init_db, get_db # noqa: E402 from src import projects as P # noqa: E402 from src.projects import reload_projects # noqa: E402 ENDURO_PLANE_ID = "7a79f0a9-5278-49cd-9007-9a338f238f9c" IN_PROGRESS = "b873d9eb-993c-48cd-97ac-99a9b1623967" BACKLOG = "113b24f6-cce8-4be9-9a22-a359b9cf0122" client = TestClient(app) @pytest.fixture(autouse=True) def setup(monkeypatch): monkeypatch.setattr(P.settings, "db_path", _test_db) import src.db as _db monkeypatch.setattr(_db.settings, "db_path", _test_db) # ORCH-088: this suite asserts the branch is cut DURING start_pipeline. With the # serial gate ON (default) the cut is deferred to the analyst-job claim, so pin # to the kill-switch-off (legacy) path — branch timing is out of scope here # (covered by test_serial_gate_branch). monkeypatch.setattr(_db.settings, "serial_gate_enabled", False, raising=False) if os.path.exists(_test_db): os.unlink(_test_db) init_db() monkeypatch.setattr("src.webhooks.plane.verify_plane_signature", lambda body, sig: True) registry_json = ( f'[{{"plane_project_id": "{ENDURO_PLANE_ID}", "repo": "enduro-trails",' f' "work_item_prefix": "ET", "name": "enduro-trails"}}]' ) monkeypatch.setattr(P.settings, "projects_json", registry_json) reload_projects() yield reload_projects() if os.path.exists(_test_db): os.unlink(_test_db) def _created(plane_id="st-created"): return client.post("/webhook/plane", json={ "event": "issue", "action": "created", "data": { "id": plane_id, "name": "A valid backlog item title", "description_stripped": "A sufficiently long description for QG-0.", "project": ENDURO_PLANE_ID, "state": {"id": BACKLOG, "name": "Backlog", "group": "backlog"}, }, }) def _to_in_progress(plane_id="st-1"): return client.post("/webhook/plane", json={ "event": "issue", "action": "updated", "data": { "id": plane_id, "name": "A valid backlog item title", "description_stripped": "A sufficiently long description for QG-0.", "project": ENDURO_PLANE_ID, "state": {"id": IN_PROGRESS, "name": "In Progress", "group": "started"}, }, "activity": {"field": "state", "new_value": IN_PROGRESS, "old_value": BACKLOG}, }) def _count(plane_id): conn = get_db() n = conn.execute("SELECT COUNT(*) FROM tasks WHERE plane_id=?", (plane_id,)).fetchone()[0] conn.close() return n # --------------------------------------------------------------------------- # @patch("src.webhooks.plane.enqueue_job") @patch("src.webhooks.plane._create_initial_docs", new_callable=AsyncMock) @patch("src.webhooks.plane._create_gitea_branch", new_callable=AsyncMock) def test_created_does_not_start_pipeline(mock_branch, mock_docs, mock_enqueue): resp = _created("st-created") assert resp.status_code == 200 assert resp.json()["status"] == "accepted" # No task, no branch, no analyst enqueue. assert _count("st-created") == 0 mock_branch.assert_not_called() mock_enqueue.assert_not_called() @patch("src.webhooks.plane.enqueue_job") @patch("src.webhooks.plane._create_initial_docs", new_callable=AsyncMock) @patch("src.webhooks.plane._create_gitea_branch", new_callable=AsyncMock) @patch("src.plane_sync.fetch_issue_sequence_id", return_value=5) def test_in_progress_starts_pipeline(mock_seq, mock_branch, mock_docs, mock_enqueue): mock_enqueue.return_value = 1 resp = _to_in_progress("st-1") assert resp.status_code == 200 assert resp.json()["status"] == "accepted" assert _count("st-1") == 1 conn = get_db() task = conn.execute("SELECT * FROM tasks WHERE plane_id='st-1'").fetchone() conn.close() assert task["stage"] == "analysis" assert task["repo"] == "enduro-trails" mock_branch.assert_called_once() # analyst enqueued exactly once assert mock_enqueue.call_count == 1 assert mock_enqueue.call_args.args[0] == "analyst" @patch("src.webhooks.plane.enqueue_job") @patch("src.webhooks.plane._create_initial_docs", new_callable=AsyncMock) @patch("src.webhooks.plane._create_gitea_branch", new_callable=AsyncMock) @patch("src.plane_sync.fetch_issue_sequence_id", return_value=5) def test_repeat_in_progress_while_job_active_does_not_relaunch( mock_seq, mock_branch, mock_docs, mock_enqueue ): """Status-only model busy-guard: a duplicate In Progress webhook that arrives while the stage agent still has a queued/running job must NOT relaunch the agent (no double launch). """ mock_enqueue.return_value = 1 _to_in_progress("st-2") assert _count("st-2") == 1 assert mock_enqueue.call_count == 1 # enqueue_job is mocked above, so no real job row exists. Seed an ACTIVE # (queued) job for the task so has_active_job_for_task() reports the agent as # busy -> the busy-guard fires. conn = get_db() task_id = conn.execute( "SELECT id FROM tasks WHERE plane_id='st-2'" ).fetchone()[0] conn.execute( "INSERT INTO jobs (agent, repo, task_id, status) VALUES (?, ?, ?, 'queued')", ("analyst", "enduro-trails", task_id), ) conn.commit() conn.close() # Second In Progress update. DISTINCT body (different activity old_value) so # webhook dedup does NOT short-circuit it — this exercises the busy-guard in # handle_status_start, not the delivery-dedup layer. resp = client.post("/webhook/plane", json={ "event": "issue", "action": "updated", "data": { "id": "st-2", "name": "A valid backlog item title", "description_stripped": "A sufficiently long description for QG-0.", "project": ENDURO_PLANE_ID, "state": {"id": IN_PROGRESS, "name": "In Progress", "group": "started"}, }, "activity": {"field": "state", "new_value": IN_PROGRESS, "old_value": "some-other-state"}, }) assert resp.status_code == 200 assert _count("st-2") == 1 # still exactly one task assert mock_enqueue.call_count == 1 # analyst NOT re-enqueued (busy-guard) @patch("src.webhooks.plane.add_comment", create=True) @patch("src.webhooks.plane.enqueue_job") @patch("src.webhooks.plane._create_initial_docs", new_callable=AsyncMock) @patch("src.webhooks.plane._create_gitea_branch", new_callable=AsyncMock) @patch("src.plane_sync.fetch_issue_sequence_id", return_value=5) def test_inprogress_from_needs_input_relaunches_analyst( mock_seq, mock_branch, mock_docs, mock_enqueue, mock_comment ): """Status-only answer-to-questions flow: an existing analysis task whose agent is IDLE (no active job — it went to Needs Input) is returned to In Progress -> the analyst is relaunched to read Slava's fresh comments. + double-webhook protection: a second In Progress while the relaunch job is active does NOT relaunch again. """ mock_enqueue.return_value = 1 # First In Progress: starts the pipeline (creates task + enqueues analyst). _to_in_progress("st-ni") assert _count("st-ni") == 1 assert mock_enqueue.call_count == 1 # The analyst finished and asked questions -> Needs Input. In our model that # means NO active job for the task (enqueue_job is mocked, so no job row). conn = get_db() task_id = conn.execute( "SELECT id FROM tasks WHERE plane_id='st-ni'" ).fetchone()[0] has_job = conn.execute( "SELECT COUNT(*) FROM jobs WHERE task_id=? AND status IN ('queued','running')", (task_id,), ).fetchone()[0] conn.close() assert has_job == 0 # agent idle # Slava answers + returns the issue to In Progress (distinct body). resp = client.post("/webhook/plane", json={ "event": "issue", "action": "updated", "data": { "id": "st-ni", "name": "A valid backlog item title", "description_stripped": "A sufficiently long description for QG-0.", "project": ENDURO_PLANE_ID, "state": {"id": IN_PROGRESS, "name": "In Progress", "group": "started"}, }, "activity": {"field": "state", "new_value": IN_PROGRESS, "old_value": "needs-input"}, }) assert resp.status_code == 200 assert _count("st-ni") == 1 # no duplicate task assert mock_enqueue.call_count == 2 # analyst RELAUNCHED assert mock_enqueue.call_args.args[0] == "analyst" # Seed an active job for the relaunch, then a SECOND In Progress webhook must # NOT relaunch again (busy-guard against double webhooks). conn = get_db() conn.execute( "INSERT INTO jobs (agent, repo, task_id, status) VALUES (?, ?, ?, 'running')", ("analyst", "enduro-trails", task_id), ) conn.commit() conn.close() resp2 = client.post("/webhook/plane", json={ "event": "issue", "action": "updated", "data": { "id": "st-ni", "name": "A valid backlog item title", "description_stripped": "A sufficiently long description for QG-0.", "project": ENDURO_PLANE_ID, "state": {"id": IN_PROGRESS, "name": "In Progress", "group": "started"}, }, "activity": {"field": "state", "new_value": IN_PROGRESS, "old_value": "x-y-z"}, }) assert resp2.status_code == 200 assert mock_enqueue.call_count == 2 # still 2 — busy-guard held