"""ORCH-5 (M-7): webhook delivery de-duplication tests. A retried/replayed webhook delivery must be processed exactly once. We mock enqueue_job (imported into the gitea/plane module namespaces) and assert its call_count does not grow on a repeat. HMAC is bypassed here by forcing the webhook secrets empty (the 9 pre-existing 401 webhook tests are a separate baseline and are NOT touched). A dedicated test keeps the 401-on-bad-signature guarantee by re-enabling the secret. """ import os import tempfile from unittest.mock import patch, AsyncMock import pytest # Override DB path + project registry BEFORE importing app (same pattern as # tests/test_webhooks.py). _test_db = os.path.join(tempfile.gettempdir(), "test_orchestrator_dedup.db") os.environ["ORCH_DB_PATH"] = _test_db os.environ["ORCH_REPOS_DIR"] = tempfile.gettempdir() os.environ["ORCH_GITEA_TOKEN"] = "test-token" os.environ["ORCH_PLANE_API_TOKEN"] = "test-token" os.environ["ORCH_GITEA_OWNER"] = "admin" os.environ["ORCH_DEFAULT_REPO"] = "enduro-trails" os.environ["ORCH_PROJECTS_JSON"] = ( '[{"plane_project_id": "proj-1", "repo": "enduro-trails", ' '"work_item_prefix": "ET", "name": "enduro-trails"}]' ) from fastapi.testclient import TestClient # noqa: E402 from src.main import app # noqa: E402 from src.db import init_db, get_db # noqa: E402 from src import db as db_module # noqa: E402 from src.webhooks import gitea as gitea_mod # noqa: E402 from src.webhooks import plane as plane_mod # noqa: E402 from src import projects as projects_mod # noqa: E402 @pytest.fixture(autouse=True) def setup_db(monkeypatch): # settings is a process-wide singleton; another test module may have fixed # settings.db_path to its own file at import time. get_db() reads it live, so # pin it to OUR db for the duration of each test here. monkeypatch.setattr(db_module.settings, "db_path", _test_db, raising=False) if os.path.exists(_test_db): os.unlink(_test_db) init_db() yield if os.path.exists(_test_db): os.unlink(_test_db) @pytest.fixture(autouse=True) def proj_registry(): """Pin the shared project registry to proj-1/enduro-trails. The registry (projects.PROJECTS / _BY_PLANE_ID) is a process-wide singleton built at import; test_projects.py rebuilds it via reload_projects(), which can leave it on the built-in default where proj-1 is unknown -> ORCH-6 would ignore our fixtures. Force ours for each test, then rebuild after. """ os.environ["ORCH_PROJECTS_JSON"] = ( '[{"plane_project_id": "proj-1", "repo": "enduro-trails", ' '"work_item_prefix": "ET", "name": "enduro-trails"}]' ) projects_mod.settings.projects_json = os.environ["ORCH_PROJECTS_JSON"] projects_mod.reload_projects() yield projects_mod.reload_projects() @pytest.fixture(autouse=True) def no_hmac(monkeypatch): """Bypass HMAC so dedup behavior (not signing) is under test. settings is shared, so override the secret on the module-level settings that each verify_* function reads. """ monkeypatch.setattr(gitea_mod.settings, "gitea_webhook_secret", "", raising=False) monkeypatch.setattr(plane_mod.settings, "plane_webhook_secret", "", raising=False) yield client = TestClient(app) def _events_count(): conn = get_db() n = conn.execute("SELECT COUNT(*) FROM events").fetchone()[0] conn.close() return n # --------------------------------------------------------------------------- # Migration # --------------------------------------------------------------------------- def test_migration_adds_delivery_id_and_index(): """events has delivery_id + a partial unique index idx_events_delivery.""" conn = get_db() cols = [r[1] for r in conn.execute("PRAGMA table_info(events)").fetchall()] idxs = [r[1] for r in conn.execute("PRAGMA index_list(events)").fetchall()] conn.close() assert "delivery_id" in cols assert "idx_events_delivery" in idxs def test_migration_on_old_db_without_column_does_not_crash(): """init_db() over a pre-existing events table WITHOUT delivery_id is safe.""" if os.path.exists(_test_db): os.unlink(_test_db) import sqlite3 conn = sqlite3.connect(_test_db) # Old-shape events table (no delivery_id) + a legacy row with NULL delivery_id. conn.executescript( """ CREATE TABLE events ( id INTEGER PRIMARY KEY AUTOINCREMENT, timestamp TEXT DEFAULT (datetime('now')), source TEXT NOT NULL, event_type TEXT NOT NULL, payload TEXT NOT NULL, processed INTEGER DEFAULT 0 ); INSERT INTO events (source, event_type, payload) VALUES ('plane','old','{}'); INSERT INTO events (source, event_type, payload) VALUES ('gitea','old2','{}'); """ ) conn.commit() conn.close() # Should add the column + index without raising and keep the legacy rows. init_db() conn = get_db() cols = [r[1] for r in conn.execute("PRAGMA table_info(events)").fetchall()] n = conn.execute("SELECT COUNT(*) FROM events").fetchone()[0] conn.close() assert "delivery_id" in cols assert n == 2 # legacy NULL-delivery rows preserved, partial index lets them coexist # --------------------------------------------------------------------------- # Gitea dedup # --------------------------------------------------------------------------- @patch.object(gitea_mod, "enqueue_job") def test_gitea_duplicate_delivery_id_skips_dispatch(mock_enqueue): """Repeated X-Gitea-Delivery -> first processed, second {"status":"duplicate"}.""" # Task at architecture so the ADR push would enqueue. conn = get_db() conn.execute( "INSERT INTO tasks (plane_id, work_item_id, repo, branch, stage) " "VALUES (?, ?, ?, ?, ?)", ("gd-001", "ET-100", "enduro-trails", "feature/ET-100-x", "architecture"), ) conn.commit() conn.close() body = { "ref": "refs/heads/feature/ET-100-x", "repository": {"name": "enduro-trails"}, "commits": [ {"added": ["docs/work-items/ET-100/06-adr/001-d.md"], "modified": []} ], } hdrs = {"X-Gitea-Event": "push", "X-Gitea-Delivery": "guid-AAA"} r1 = client.post("/webhook/gitea", json=body, headers=hdrs) assert r1.status_code == 200 assert r1.json()["status"] == "accepted" assert mock_enqueue.call_count == 1 assert _events_count() == 1 # Same delivery id again -> duplicate, no new enqueue, no new event row. r2 = client.post("/webhook/gitea", json=body, headers=hdrs) assert r2.status_code == 200 assert r2.json()["status"] == "duplicate" assert mock_enqueue.call_count == 1 assert _events_count() == 1 @patch.object(gitea_mod, "enqueue_job") def test_gitea_two_distinct_delivery_ids_both_processed(mock_enqueue): body = {"ref": "refs/heads/feature/none", "repository": {"name": "enduro-trails"}, "commits": []} r1 = client.post("/webhook/gitea", json=body, headers={"X-Gitea-Event": "push", "X-Gitea-Delivery": "guid-1"}) r2 = client.post("/webhook/gitea", json=body, headers={"X-Gitea-Event": "push", "X-Gitea-Delivery": "guid-2"}) assert r1.json()["status"] == "accepted" assert r2.json()["status"] == "accepted" assert _events_count() == 2 def test_gitea_fallback_hash_when_no_delivery_header(): """No X-Gitea-Delivery -> sha256 fallback; identical body repeat = duplicate.""" body = {"ref": "refs/heads/feature/none", "repository": {"name": "enduro-trails"}, "commits": []} r1 = client.post("/webhook/gitea", json=body, headers={"X-Gitea-Event": "push"}) r2 = client.post("/webhook/gitea", json=body, headers={"X-Gitea-Event": "push"}) assert r1.json()["status"] == "accepted" assert r2.json()["status"] == "duplicate" assert _events_count() == 1 # --------------------------------------------------------------------------- # Plane dedup # --------------------------------------------------------------------------- @patch.object(plane_mod, "enqueue_job") @patch("src.webhooks.plane._create_gitea_branch", new_callable=AsyncMock) @patch("src.webhooks.plane._create_initial_docs", new_callable=AsyncMock) def test_plane_fallback_hash_dedup(mock_docs, mock_branch, mock_enqueue): """Repeated identical Plane body -> first accepted+enqueue, repeat duplicate. Feature 1: the pipeline now starts on a status change to In Progress, not on creation, so this drives the dedup test with an 'issue updated' event. """ IN_PROGRESS = "b873d9eb-993c-48cd-97ac-99a9b1623967" body = { "event": "issue", "action": "updated", "data": { "id": "pd-001", "name": "Dedup plane task", "description_stripped": "A sufficiently long description for QG-0 to pass.", "project": "proj-1", "state": {"id": IN_PROGRESS, "name": "In Progress", "group": "started"}, }, } r1 = client.post("/webhook/plane", json=body) assert r1.status_code == 200 assert r1.json()["status"] == "accepted" assert mock_enqueue.call_count == 1 assert _events_count() == 1 r2 = client.post("/webhook/plane", json=body) assert r2.status_code == 200 assert r2.json()["status"] == "duplicate" assert mock_enqueue.call_count == 1 # not re-enqueued assert _events_count() == 1 @patch("src.webhooks.plane._create_gitea_branch", new_callable=AsyncMock) @patch("src.webhooks.plane._create_initial_docs", new_callable=AsyncMock) def test_plane_unknown_project_first_delivery_still_ignored(mock_docs, mock_branch): """ORCH-6 intact: first delivery of an unknown project -> {"status":"ignored"}.""" body = { "event": "work_item.created", "data": {"id": "unk-001", "name": "Unknown project task", "project": "proj-UNKNOWN"}, } r1 = client.post("/webhook/plane", json=body) assert r1.status_code == 200 assert r1.json()["status"] == "ignored" # Event WAS logged (dedup happens before the project filter), so a retry of the # SAME body is a duplicate, not re-evaluated. assert _events_count() == 1 r2 = client.post("/webhook/plane", json=body) assert r2.json()["status"] == "duplicate" assert _events_count() == 1 # --------------------------------------------------------------------------- # HMAC still guarded (acceptance #4) — independent of the dedup path # --------------------------------------------------------------------------- def test_gitea_invalid_signature_still_401(monkeypatch): monkeypatch.setattr(gitea_mod.settings, "gitea_webhook_secret", "s3cr3t", raising=False) r = client.post( "/webhook/gitea", json={"ref": "refs/heads/feature/x", "repository": {"name": "enduro-trails"}, "commits": []}, headers={"X-Gitea-Event": "push", "X-Gitea-Signature": "deadbeef"}, ) assert r.status_code == 401 def test_plane_invalid_signature_still_401(monkeypatch): monkeypatch.setattr(plane_mod.settings, "plane_webhook_secret", "s3cr3t", raising=False) r = client.post( "/webhook/plane", json={"event": "work_item.created", "data": {"id": "z", "project": "proj-1"}}, headers={"X-Plane-Signature": "deadbeef"}, ) assert r.status_code == 401