test(webhook): cover delivery dedup + migration safety (M-7)
This commit is contained in:
277
tests/test_webhook_dedup.py
Normal file
277
tests/test_webhook_dedup.py
Normal file
@@ -0,0 +1,277 @@
|
||||
"""ORCH-5 (M-7): webhook delivery de-duplication tests.
|
||||
|
||||
A retried/replayed webhook delivery must be processed exactly once. We mock
|
||||
enqueue_job (imported into the gitea/plane module namespaces) and assert its
|
||||
call_count does not grow on a repeat. HMAC is bypassed here by forcing the
|
||||
webhook secrets empty (the 9 pre-existing 401 webhook tests are a separate
|
||||
baseline and are NOT touched). A dedicated test keeps the 401-on-bad-signature
|
||||
guarantee by re-enabling the secret.
|
||||
"""
|
||||
|
||||
import os
|
||||
import tempfile
|
||||
from unittest.mock import patch, AsyncMock
|
||||
|
||||
import pytest
|
||||
|
||||
# Override DB path + project registry BEFORE importing app (same pattern as
|
||||
# tests/test_webhooks.py).
|
||||
_test_db = os.path.join(tempfile.gettempdir(), "test_orchestrator_dedup.db")
|
||||
os.environ["ORCH_DB_PATH"] = _test_db
|
||||
os.environ["ORCH_REPOS_DIR"] = tempfile.gettempdir()
|
||||
os.environ["ORCH_GITEA_TOKEN"] = "test-token"
|
||||
os.environ["ORCH_PLANE_API_TOKEN"] = "test-token"
|
||||
os.environ["ORCH_GITEA_OWNER"] = "admin"
|
||||
os.environ["ORCH_DEFAULT_REPO"] = "enduro-trails"
|
||||
os.environ["ORCH_PROJECTS_JSON"] = (
|
||||
'[{"plane_project_id": "proj-1", "repo": "enduro-trails", '
|
||||
'"work_item_prefix": "ET", "name": "enduro-trails"}]'
|
||||
)
|
||||
|
||||
from fastapi.testclient import TestClient # noqa: E402
|
||||
from src.main import app # noqa: E402
|
||||
from src.db import init_db, get_db # noqa: E402
|
||||
from src import db as db_module # noqa: E402
|
||||
from src.webhooks import gitea as gitea_mod # noqa: E402
|
||||
from src.webhooks import plane as plane_mod # noqa: E402
|
||||
from src import projects as projects_mod # noqa: E402
|
||||
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def setup_db(monkeypatch):
|
||||
# settings is a process-wide singleton; another test module may have fixed
|
||||
# settings.db_path to its own file at import time. get_db() reads it live, so
|
||||
# pin it to OUR db for the duration of each test here.
|
||||
monkeypatch.setattr(db_module.settings, "db_path", _test_db, raising=False)
|
||||
if os.path.exists(_test_db):
|
||||
os.unlink(_test_db)
|
||||
init_db()
|
||||
yield
|
||||
if os.path.exists(_test_db):
|
||||
os.unlink(_test_db)
|
||||
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def proj_registry():
|
||||
"""Pin the shared project registry to proj-1/enduro-trails.
|
||||
|
||||
The registry (projects.PROJECTS / _BY_PLANE_ID) is a process-wide singleton
|
||||
built at import; test_projects.py rebuilds it via reload_projects(), which can
|
||||
leave it on the built-in default where proj-1 is unknown -> ORCH-6 would
|
||||
ignore our fixtures. Force ours for each test, then rebuild after.
|
||||
"""
|
||||
os.environ["ORCH_PROJECTS_JSON"] = (
|
||||
'[{"plane_project_id": "proj-1", "repo": "enduro-trails", '
|
||||
'"work_item_prefix": "ET", "name": "enduro-trails"}]'
|
||||
)
|
||||
projects_mod.settings.projects_json = os.environ["ORCH_PROJECTS_JSON"]
|
||||
projects_mod.reload_projects()
|
||||
yield
|
||||
projects_mod.reload_projects()
|
||||
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def no_hmac(monkeypatch):
|
||||
"""Bypass HMAC so dedup behavior (not signing) is under test.
|
||||
|
||||
settings is shared, so override the secret on the module-level settings that
|
||||
each verify_* function reads.
|
||||
"""
|
||||
monkeypatch.setattr(gitea_mod.settings, "gitea_webhook_secret", "", raising=False)
|
||||
monkeypatch.setattr(plane_mod.settings, "plane_webhook_secret", "", raising=False)
|
||||
yield
|
||||
|
||||
|
||||
client = TestClient(app)
|
||||
|
||||
|
||||
def _events_count():
|
||||
conn = get_db()
|
||||
n = conn.execute("SELECT COUNT(*) FROM events").fetchone()[0]
|
||||
conn.close()
|
||||
return n
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Migration
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def test_migration_adds_delivery_id_and_index():
|
||||
"""events has delivery_id + a partial unique index idx_events_delivery."""
|
||||
conn = get_db()
|
||||
cols = [r[1] for r in conn.execute("PRAGMA table_info(events)").fetchall()]
|
||||
idxs = [r[1] for r in conn.execute("PRAGMA index_list(events)").fetchall()]
|
||||
conn.close()
|
||||
assert "delivery_id" in cols
|
||||
assert "idx_events_delivery" in idxs
|
||||
|
||||
|
||||
def test_migration_on_old_db_without_column_does_not_crash():
|
||||
"""init_db() over a pre-existing events table WITHOUT delivery_id is safe."""
|
||||
if os.path.exists(_test_db):
|
||||
os.unlink(_test_db)
|
||||
import sqlite3
|
||||
conn = sqlite3.connect(_test_db)
|
||||
# Old-shape events table (no delivery_id) + a legacy row with NULL delivery_id.
|
||||
conn.executescript(
|
||||
"""
|
||||
CREATE TABLE events (
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
timestamp TEXT DEFAULT (datetime('now')),
|
||||
source TEXT NOT NULL,
|
||||
event_type TEXT NOT NULL,
|
||||
payload TEXT NOT NULL,
|
||||
processed INTEGER DEFAULT 0
|
||||
);
|
||||
INSERT INTO events (source, event_type, payload) VALUES ('plane','old','{}');
|
||||
INSERT INTO events (source, event_type, payload) VALUES ('gitea','old2','{}');
|
||||
"""
|
||||
)
|
||||
conn.commit()
|
||||
conn.close()
|
||||
|
||||
# Should add the column + index without raising and keep the legacy rows.
|
||||
init_db()
|
||||
|
||||
conn = get_db()
|
||||
cols = [r[1] for r in conn.execute("PRAGMA table_info(events)").fetchall()]
|
||||
n = conn.execute("SELECT COUNT(*) FROM events").fetchone()[0]
|
||||
conn.close()
|
||||
assert "delivery_id" in cols
|
||||
assert n == 2 # legacy NULL-delivery rows preserved, partial index lets them coexist
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Gitea dedup
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
@patch.object(gitea_mod, "enqueue_job")
|
||||
def test_gitea_duplicate_delivery_id_skips_dispatch(mock_enqueue):
|
||||
"""Repeated X-Gitea-Delivery -> first processed, second {"status":"duplicate"}."""
|
||||
# Task at architecture so the ADR push would enqueue.
|
||||
conn = get_db()
|
||||
conn.execute(
|
||||
"INSERT INTO tasks (plane_id, work_item_id, repo, branch, stage) "
|
||||
"VALUES (?, ?, ?, ?, ?)",
|
||||
("gd-001", "ET-100", "enduro-trails", "feature/ET-100-x", "architecture"),
|
||||
)
|
||||
conn.commit()
|
||||
conn.close()
|
||||
|
||||
body = {
|
||||
"ref": "refs/heads/feature/ET-100-x",
|
||||
"repository": {"name": "enduro-trails"},
|
||||
"commits": [
|
||||
{"added": ["docs/work-items/ET-100/06-adr/001-d.md"], "modified": []}
|
||||
],
|
||||
}
|
||||
hdrs = {"X-Gitea-Event": "push", "X-Gitea-Delivery": "guid-AAA"}
|
||||
|
||||
r1 = client.post("/webhook/gitea", json=body, headers=hdrs)
|
||||
assert r1.status_code == 200
|
||||
assert r1.json()["status"] == "accepted"
|
||||
assert mock_enqueue.call_count == 1
|
||||
assert _events_count() == 1
|
||||
|
||||
# Same delivery id again -> duplicate, no new enqueue, no new event row.
|
||||
r2 = client.post("/webhook/gitea", json=body, headers=hdrs)
|
||||
assert r2.status_code == 200
|
||||
assert r2.json()["status"] == "duplicate"
|
||||
assert mock_enqueue.call_count == 1
|
||||
assert _events_count() == 1
|
||||
|
||||
|
||||
@patch.object(gitea_mod, "enqueue_job")
|
||||
def test_gitea_two_distinct_delivery_ids_both_processed(mock_enqueue):
|
||||
body = {"ref": "refs/heads/feature/none", "repository": {"name": "enduro-trails"}, "commits": []}
|
||||
r1 = client.post("/webhook/gitea", json=body,
|
||||
headers={"X-Gitea-Event": "push", "X-Gitea-Delivery": "guid-1"})
|
||||
r2 = client.post("/webhook/gitea", json=body,
|
||||
headers={"X-Gitea-Event": "push", "X-Gitea-Delivery": "guid-2"})
|
||||
assert r1.json()["status"] == "accepted"
|
||||
assert r2.json()["status"] == "accepted"
|
||||
assert _events_count() == 2
|
||||
|
||||
|
||||
def test_gitea_fallback_hash_when_no_delivery_header():
|
||||
"""No X-Gitea-Delivery -> sha256 fallback; identical body repeat = duplicate."""
|
||||
body = {"ref": "refs/heads/feature/none", "repository": {"name": "enduro-trails"}, "commits": []}
|
||||
r1 = client.post("/webhook/gitea", json=body, headers={"X-Gitea-Event": "push"})
|
||||
r2 = client.post("/webhook/gitea", json=body, headers={"X-Gitea-Event": "push"})
|
||||
assert r1.json()["status"] == "accepted"
|
||||
assert r2.json()["status"] == "duplicate"
|
||||
assert _events_count() == 1
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Plane dedup
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
@patch.object(plane_mod, "enqueue_job")
|
||||
@patch("src.webhooks.plane._create_gitea_branch", new_callable=AsyncMock)
|
||||
@patch("src.webhooks.plane._create_initial_docs", new_callable=AsyncMock)
|
||||
def test_plane_fallback_hash_dedup(mock_docs, mock_branch, mock_enqueue):
|
||||
"""Repeated identical Plane body -> first accepted+enqueue, repeat duplicate."""
|
||||
body = {
|
||||
"event": "work_item.created",
|
||||
"data": {
|
||||
"id": "pd-001",
|
||||
"name": "Dedup plane task",
|
||||
"description_stripped": "A sufficiently long description for QG-0 to pass.",
|
||||
"project": "proj-1",
|
||||
},
|
||||
}
|
||||
r1 = client.post("/webhook/plane", json=body)
|
||||
assert r1.status_code == 200
|
||||
assert r1.json()["status"] == "accepted"
|
||||
assert mock_enqueue.call_count == 1
|
||||
assert _events_count() == 1
|
||||
|
||||
r2 = client.post("/webhook/plane", json=body)
|
||||
assert r2.status_code == 200
|
||||
assert r2.json()["status"] == "duplicate"
|
||||
assert mock_enqueue.call_count == 1 # not re-enqueued
|
||||
assert _events_count() == 1
|
||||
|
||||
|
||||
@patch("src.webhooks.plane._create_gitea_branch", new_callable=AsyncMock)
|
||||
@patch("src.webhooks.plane._create_initial_docs", new_callable=AsyncMock)
|
||||
def test_plane_unknown_project_first_delivery_still_ignored(mock_docs, mock_branch):
|
||||
"""ORCH-6 intact: first delivery of an unknown project -> {"status":"ignored"}."""
|
||||
body = {
|
||||
"event": "work_item.created",
|
||||
"data": {"id": "unk-001", "name": "Unknown project task", "project": "proj-UNKNOWN"},
|
||||
}
|
||||
r1 = client.post("/webhook/plane", json=body)
|
||||
assert r1.status_code == 200
|
||||
assert r1.json()["status"] == "ignored"
|
||||
# Event WAS logged (dedup happens before the project filter), so a retry of the
|
||||
# SAME body is a duplicate, not re-evaluated.
|
||||
assert _events_count() == 1
|
||||
r2 = client.post("/webhook/plane", json=body)
|
||||
assert r2.json()["status"] == "duplicate"
|
||||
assert _events_count() == 1
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# HMAC still guarded (acceptance #4) — independent of the dedup path
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def test_gitea_invalid_signature_still_401(monkeypatch):
|
||||
monkeypatch.setattr(gitea_mod.settings, "gitea_webhook_secret", "s3cr3t", raising=False)
|
||||
r = client.post(
|
||||
"/webhook/gitea",
|
||||
json={"ref": "refs/heads/feature/x", "repository": {"name": "enduro-trails"}, "commits": []},
|
||||
headers={"X-Gitea-Event": "push", "X-Gitea-Signature": "deadbeef"},
|
||||
)
|
||||
assert r.status_code == 401
|
||||
|
||||
|
||||
def test_plane_invalid_signature_still_401(monkeypatch):
|
||||
monkeypatch.setattr(plane_mod.settings, "plane_webhook_secret", "s3cr3t", raising=False)
|
||||
r = client.post(
|
||||
"/webhook/plane",
|
||||
json={"event": "work_item.created", "data": {"id": "z", "project": "proj-1"}},
|
||||
headers={"X-Plane-Signature": "deadbeef"},
|
||||
)
|
||||
assert r.status_code == 401
|
||||
Reference in New Issue
Block a user