Merge pull request 'ORCH-5: webhook delivery dedup (M-7)' (#6) from feature/ORCH-5-webhook-dedup into main
This commit was merged in pull request #6.
This commit is contained in:
38
src/db.py
38
src/db.py
@@ -67,6 +67,17 @@ def init_db():
|
||||
# (CREATE TABLE IF NOT EXISTS won't add columns to an already-created table).
|
||||
_ensure_column(conn, "jobs", "transient_attempts", "INTEGER NOT NULL DEFAULT 0")
|
||||
_ensure_column(conn, "jobs", "available_at", "TEXT")
|
||||
# ORCH-5 (M-7): webhook delivery de-dup. Add events.delivery_id and a PARTIAL
|
||||
# unique index. Partial (WHERE delivery_id IS NOT NULL) so pre-existing rows
|
||||
# (which have NULL delivery_id) never collide with each other. Restart-safe:
|
||||
# _ensure_column is a no-op once the column exists, and CREATE INDEX IF NOT
|
||||
# EXISTS is a no-op once the index exists, so this is safe on the live prod DB.
|
||||
_ensure_column(conn, "events", "delivery_id", "TEXT")
|
||||
conn.execute(
|
||||
"CREATE UNIQUE INDEX IF NOT EXISTS idx_events_delivery "
|
||||
"ON events(delivery_id) WHERE delivery_id IS NOT NULL"
|
||||
)
|
||||
conn.commit()
|
||||
conn.close()
|
||||
|
||||
|
||||
@@ -141,6 +152,33 @@ def get_next_work_item_id(repo: str, prefix: str = "ET") -> str:
|
||||
return f"{prefix}-{next_num:03d}"
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# ORCH-5 (M-7): idempotent webhook event logging
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def insert_event_dedup(
|
||||
source: str, event_type: str, payload: str, delivery_id: str
|
||||
) -> bool:
|
||||
"""Idempotently log a webhook event keyed by delivery_id.
|
||||
|
||||
Returns True if a NEW row was inserted (caller should dispatch the event) and
|
||||
False if this delivery_id was already present (a duplicate delivery -> caller
|
||||
must skip dispatch/enqueue). Uses INSERT OR IGNORE against the partial UNIQUE
|
||||
index idx_events_delivery; rowcount==1 means the row was actually inserted.
|
||||
"""
|
||||
conn = get_db()
|
||||
try:
|
||||
cur = conn.execute(
|
||||
"INSERT OR IGNORE INTO events (source, event_type, payload, delivery_id) "
|
||||
"VALUES (?, ?, ?, ?)",
|
||||
(source, event_type, payload, delivery_id),
|
||||
)
|
||||
conn.commit()
|
||||
return cur.rowcount == 1
|
||||
finally:
|
||||
conn.close()
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# ORCH-1 (F-2b): job queue helpers
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
52
src/webhooks/_dedup.py
Normal file
52
src/webhooks/_dedup.py
Normal file
@@ -0,0 +1,52 @@
|
||||
"""ORCH-5 (M-7): webhook delivery de-duplication helper.
|
||||
|
||||
Webhook providers (Gitea/Plane) retry deliveries on timeout, network reset, or
|
||||
manual replay. Without idempotency a retried delivery re-enters the pipeline and
|
||||
spawns a duplicate run (the ET-009 incident class: parallel conveyors on one
|
||||
repo). This module computes a stable per-delivery id so the webhook handlers can
|
||||
INSERT-OR-IGNORE into events and skip the dispatch on a repeat.
|
||||
|
||||
delivery_id format: ``f"{source}:{raw_or_hash}"`` where source prefixes
|
||||
gitea/plane so their id-spaces never collide. ``raw`` is the provider's native
|
||||
delivery header (a GUID) when present; otherwise we fall back to a sha256 of the
|
||||
body (a retried identical body yields the same hash).
|
||||
"""
|
||||
|
||||
import hashlib
|
||||
|
||||
|
||||
def _sha256_hex(*parts: str) -> str:
|
||||
h = hashlib.sha256()
|
||||
for p in parts:
|
||||
h.update(p.encode("utf-8", "replace"))
|
||||
return h.hexdigest()
|
||||
|
||||
|
||||
def gitea_delivery_id(headers, event_type: str, body: bytes) -> str:
|
||||
"""Compute the delivery_id for a Gitea webhook.
|
||||
|
||||
Prefers the ``X-Gitea-Delivery`` header (a per-delivery GUID). Falls back to
|
||||
sha256(source + event_type + body) so a retried identical body still maps to
|
||||
one id even if Gitea omitted the header.
|
||||
"""
|
||||
raw = (headers.get("X-Gitea-Delivery") or "").strip()
|
||||
if not raw:
|
||||
raw = _sha256_hex("gitea", event_type or "", body.decode("utf-8", "replace"))
|
||||
return f"gitea:{raw}"
|
||||
|
||||
|
||||
def plane_delivery_id(headers, body: bytes) -> str:
|
||||
"""Compute the delivery_id for a Plane webhook.
|
||||
|
||||
Plane does not reliably send a delivery header, so we try a couple of common
|
||||
names and otherwise fall back to sha256("plane" + body): a retried identical
|
||||
body yields the same id.
|
||||
"""
|
||||
raw = (
|
||||
headers.get("X-Plane-Delivery")
|
||||
or headers.get("X-Hook-Delivery")
|
||||
or ""
|
||||
).strip()
|
||||
if not raw:
|
||||
raw = _sha256_hex("plane", body.decode("utf-8", "replace"))
|
||||
return f"plane:{raw}"
|
||||
@@ -10,7 +10,14 @@ import httpx
|
||||
from fastapi import APIRouter, Request, HTTPException
|
||||
|
||||
from ..config import settings
|
||||
from ..db import get_db, get_task_by_repo_branch, update_task_stage, enqueue_job
|
||||
from ..db import (
|
||||
get_db,
|
||||
get_task_by_repo_branch,
|
||||
update_task_stage,
|
||||
enqueue_job,
|
||||
insert_event_dedup,
|
||||
)
|
||||
from ._dedup import gitea_delivery_id
|
||||
from ..stages import get_next_stage, get_agent_for_stage
|
||||
from ..qg.checks import check_ci_green, check_review_approved
|
||||
from ..notifications import notify_stage_change, notify_qg_failure, notify_error
|
||||
@@ -51,15 +58,17 @@ async def gitea_webhook(request: Request):
|
||||
|
||||
payload = json.loads(body)
|
||||
|
||||
# Log event
|
||||
conn = get_db()
|
||||
# ORCH-5 (M-7): idempotent logging. Compute a stable delivery_id (X-Gitea-Delivery
|
||||
# GUID, or sha256 fallback) and INSERT OR IGNORE. A repeated delivery (Gitea retry
|
||||
# / manual replay) returns inserted=False -> log + return {"status":"duplicate"}
|
||||
# WITHOUT re-dispatching, so the pipeline is not re-triggered (ET-009 class).
|
||||
# Runs AFTER HMAC verification above.
|
||||
event_type = request.headers.get("X-Gitea-Event", "unknown")
|
||||
conn.execute(
|
||||
"INSERT INTO events (source, event_type, payload) VALUES (?, ?, ?)",
|
||||
("gitea", event_type, body.decode()),
|
||||
)
|
||||
conn.commit()
|
||||
conn.close()
|
||||
delivery_id = gitea_delivery_id(request.headers, event_type, body)
|
||||
inserted = insert_event_dedup("gitea", event_type, body.decode(), delivery_id)
|
||||
if not inserted:
|
||||
logger.info(f"Gitea webhook duplicate delivery_id={delivery_id}, skipping dispatch")
|
||||
return {"status": "duplicate"}
|
||||
|
||||
if event_type == "push":
|
||||
await handle_push(payload)
|
||||
|
||||
@@ -15,7 +15,9 @@ from ..db import (
|
||||
get_next_work_item_id,
|
||||
update_task_stage,
|
||||
enqueue_job,
|
||||
insert_event_dedup,
|
||||
)
|
||||
from ._dedup import plane_delivery_id
|
||||
from ..stages import get_next_stage, get_agent_for_stage, get_qg_for_stage, get_previous_stage
|
||||
from ..qg.checks import QG_CHECKS
|
||||
from ..notifications import notify_stage_change, notify_qg_failure, notify_error
|
||||
@@ -61,14 +63,18 @@ async def plane_webhook(request: Request):
|
||||
|
||||
payload = json.loads(body)
|
||||
|
||||
# Log event
|
||||
conn = get_db()
|
||||
conn.execute(
|
||||
"INSERT INTO events (source, event_type, payload) VALUES (?, ?, ?)",
|
||||
("plane", payload.get("event", "unknown"), body.decode()),
|
||||
)
|
||||
conn.commit()
|
||||
conn.close()
|
||||
# ORCH-5 (M-7): idempotent logging. Plane rarely sends a delivery header, so the
|
||||
# delivery_id falls back to sha256("plane" + body) (a retried identical body maps
|
||||
# to one id). INSERT OR IGNORE; a duplicate returns inserted=False -> log + return
|
||||
# {"status":"duplicate"} WITHOUT dispatching. Runs AFTER HMAC and BEFORE the ORCH-6
|
||||
# project filter, so a repeat does no extra work; the FIRST delivery of an unknown
|
||||
# project still falls through to the filter below and returns {"status":"ignored"}.
|
||||
event_type = payload.get("event", "unknown")
|
||||
delivery_id = plane_delivery_id(request.headers, body)
|
||||
inserted = insert_event_dedup("plane", event_type, body.decode(), delivery_id)
|
||||
if not inserted:
|
||||
logger.info(f"Plane webhook duplicate delivery_id={delivery_id}, skipping dispatch")
|
||||
return {"status": "duplicate"}
|
||||
|
||||
event = payload.get("event")
|
||||
action = payload.get("action", "")
|
||||
|
||||
277
tests/test_webhook_dedup.py
Normal file
277
tests/test_webhook_dedup.py
Normal file
@@ -0,0 +1,277 @@
|
||||
"""ORCH-5 (M-7): webhook delivery de-duplication tests.
|
||||
|
||||
A retried/replayed webhook delivery must be processed exactly once. We mock
|
||||
enqueue_job (imported into the gitea/plane module namespaces) and assert its
|
||||
call_count does not grow on a repeat. HMAC is bypassed here by forcing the
|
||||
webhook secrets empty (the 9 pre-existing 401 webhook tests are a separate
|
||||
baseline and are NOT touched). A dedicated test keeps the 401-on-bad-signature
|
||||
guarantee by re-enabling the secret.
|
||||
"""
|
||||
|
||||
import os
|
||||
import tempfile
|
||||
from unittest.mock import patch, AsyncMock
|
||||
|
||||
import pytest
|
||||
|
||||
# Override DB path + project registry BEFORE importing app (same pattern as
|
||||
# tests/test_webhooks.py).
|
||||
_test_db = os.path.join(tempfile.gettempdir(), "test_orchestrator_dedup.db")
|
||||
os.environ["ORCH_DB_PATH"] = _test_db
|
||||
os.environ["ORCH_REPOS_DIR"] = tempfile.gettempdir()
|
||||
os.environ["ORCH_GITEA_TOKEN"] = "test-token"
|
||||
os.environ["ORCH_PLANE_API_TOKEN"] = "test-token"
|
||||
os.environ["ORCH_GITEA_OWNER"] = "admin"
|
||||
os.environ["ORCH_DEFAULT_REPO"] = "enduro-trails"
|
||||
os.environ["ORCH_PROJECTS_JSON"] = (
|
||||
'[{"plane_project_id": "proj-1", "repo": "enduro-trails", '
|
||||
'"work_item_prefix": "ET", "name": "enduro-trails"}]'
|
||||
)
|
||||
|
||||
from fastapi.testclient import TestClient # noqa: E402
|
||||
from src.main import app # noqa: E402
|
||||
from src.db import init_db, get_db # noqa: E402
|
||||
from src import db as db_module # noqa: E402
|
||||
from src.webhooks import gitea as gitea_mod # noqa: E402
|
||||
from src.webhooks import plane as plane_mod # noqa: E402
|
||||
from src import projects as projects_mod # noqa: E402
|
||||
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def setup_db(monkeypatch):
|
||||
# settings is a process-wide singleton; another test module may have fixed
|
||||
# settings.db_path to its own file at import time. get_db() reads it live, so
|
||||
# pin it to OUR db for the duration of each test here.
|
||||
monkeypatch.setattr(db_module.settings, "db_path", _test_db, raising=False)
|
||||
if os.path.exists(_test_db):
|
||||
os.unlink(_test_db)
|
||||
init_db()
|
||||
yield
|
||||
if os.path.exists(_test_db):
|
||||
os.unlink(_test_db)
|
||||
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def proj_registry():
|
||||
"""Pin the shared project registry to proj-1/enduro-trails.
|
||||
|
||||
The registry (projects.PROJECTS / _BY_PLANE_ID) is a process-wide singleton
|
||||
built at import; test_projects.py rebuilds it via reload_projects(), which can
|
||||
leave it on the built-in default where proj-1 is unknown -> ORCH-6 would
|
||||
ignore our fixtures. Force ours for each test, then rebuild after.
|
||||
"""
|
||||
os.environ["ORCH_PROJECTS_JSON"] = (
|
||||
'[{"plane_project_id": "proj-1", "repo": "enduro-trails", '
|
||||
'"work_item_prefix": "ET", "name": "enduro-trails"}]'
|
||||
)
|
||||
projects_mod.settings.projects_json = os.environ["ORCH_PROJECTS_JSON"]
|
||||
projects_mod.reload_projects()
|
||||
yield
|
||||
projects_mod.reload_projects()
|
||||
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def no_hmac(monkeypatch):
|
||||
"""Bypass HMAC so dedup behavior (not signing) is under test.
|
||||
|
||||
settings is shared, so override the secret on the module-level settings that
|
||||
each verify_* function reads.
|
||||
"""
|
||||
monkeypatch.setattr(gitea_mod.settings, "gitea_webhook_secret", "", raising=False)
|
||||
monkeypatch.setattr(plane_mod.settings, "plane_webhook_secret", "", raising=False)
|
||||
yield
|
||||
|
||||
|
||||
client = TestClient(app)
|
||||
|
||||
|
||||
def _events_count():
|
||||
conn = get_db()
|
||||
n = conn.execute("SELECT COUNT(*) FROM events").fetchone()[0]
|
||||
conn.close()
|
||||
return n
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Migration
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def test_migration_adds_delivery_id_and_index():
|
||||
"""events has delivery_id + a partial unique index idx_events_delivery."""
|
||||
conn = get_db()
|
||||
cols = [r[1] for r in conn.execute("PRAGMA table_info(events)").fetchall()]
|
||||
idxs = [r[1] for r in conn.execute("PRAGMA index_list(events)").fetchall()]
|
||||
conn.close()
|
||||
assert "delivery_id" in cols
|
||||
assert "idx_events_delivery" in idxs
|
||||
|
||||
|
||||
def test_migration_on_old_db_without_column_does_not_crash():
|
||||
"""init_db() over a pre-existing events table WITHOUT delivery_id is safe."""
|
||||
if os.path.exists(_test_db):
|
||||
os.unlink(_test_db)
|
||||
import sqlite3
|
||||
conn = sqlite3.connect(_test_db)
|
||||
# Old-shape events table (no delivery_id) + a legacy row with NULL delivery_id.
|
||||
conn.executescript(
|
||||
"""
|
||||
CREATE TABLE events (
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
timestamp TEXT DEFAULT (datetime('now')),
|
||||
source TEXT NOT NULL,
|
||||
event_type TEXT NOT NULL,
|
||||
payload TEXT NOT NULL,
|
||||
processed INTEGER DEFAULT 0
|
||||
);
|
||||
INSERT INTO events (source, event_type, payload) VALUES ('plane','old','{}');
|
||||
INSERT INTO events (source, event_type, payload) VALUES ('gitea','old2','{}');
|
||||
"""
|
||||
)
|
||||
conn.commit()
|
||||
conn.close()
|
||||
|
||||
# Should add the column + index without raising and keep the legacy rows.
|
||||
init_db()
|
||||
|
||||
conn = get_db()
|
||||
cols = [r[1] for r in conn.execute("PRAGMA table_info(events)").fetchall()]
|
||||
n = conn.execute("SELECT COUNT(*) FROM events").fetchone()[0]
|
||||
conn.close()
|
||||
assert "delivery_id" in cols
|
||||
assert n == 2 # legacy NULL-delivery rows preserved, partial index lets them coexist
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Gitea dedup
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
@patch.object(gitea_mod, "enqueue_job")
|
||||
def test_gitea_duplicate_delivery_id_skips_dispatch(mock_enqueue):
|
||||
"""Repeated X-Gitea-Delivery -> first processed, second {"status":"duplicate"}."""
|
||||
# Task at architecture so the ADR push would enqueue.
|
||||
conn = get_db()
|
||||
conn.execute(
|
||||
"INSERT INTO tasks (plane_id, work_item_id, repo, branch, stage) "
|
||||
"VALUES (?, ?, ?, ?, ?)",
|
||||
("gd-001", "ET-100", "enduro-trails", "feature/ET-100-x", "architecture"),
|
||||
)
|
||||
conn.commit()
|
||||
conn.close()
|
||||
|
||||
body = {
|
||||
"ref": "refs/heads/feature/ET-100-x",
|
||||
"repository": {"name": "enduro-trails"},
|
||||
"commits": [
|
||||
{"added": ["docs/work-items/ET-100/06-adr/001-d.md"], "modified": []}
|
||||
],
|
||||
}
|
||||
hdrs = {"X-Gitea-Event": "push", "X-Gitea-Delivery": "guid-AAA"}
|
||||
|
||||
r1 = client.post("/webhook/gitea", json=body, headers=hdrs)
|
||||
assert r1.status_code == 200
|
||||
assert r1.json()["status"] == "accepted"
|
||||
assert mock_enqueue.call_count == 1
|
||||
assert _events_count() == 1
|
||||
|
||||
# Same delivery id again -> duplicate, no new enqueue, no new event row.
|
||||
r2 = client.post("/webhook/gitea", json=body, headers=hdrs)
|
||||
assert r2.status_code == 200
|
||||
assert r2.json()["status"] == "duplicate"
|
||||
assert mock_enqueue.call_count == 1
|
||||
assert _events_count() == 1
|
||||
|
||||
|
||||
@patch.object(gitea_mod, "enqueue_job")
|
||||
def test_gitea_two_distinct_delivery_ids_both_processed(mock_enqueue):
|
||||
body = {"ref": "refs/heads/feature/none", "repository": {"name": "enduro-trails"}, "commits": []}
|
||||
r1 = client.post("/webhook/gitea", json=body,
|
||||
headers={"X-Gitea-Event": "push", "X-Gitea-Delivery": "guid-1"})
|
||||
r2 = client.post("/webhook/gitea", json=body,
|
||||
headers={"X-Gitea-Event": "push", "X-Gitea-Delivery": "guid-2"})
|
||||
assert r1.json()["status"] == "accepted"
|
||||
assert r2.json()["status"] == "accepted"
|
||||
assert _events_count() == 2
|
||||
|
||||
|
||||
def test_gitea_fallback_hash_when_no_delivery_header():
|
||||
"""No X-Gitea-Delivery -> sha256 fallback; identical body repeat = duplicate."""
|
||||
body = {"ref": "refs/heads/feature/none", "repository": {"name": "enduro-trails"}, "commits": []}
|
||||
r1 = client.post("/webhook/gitea", json=body, headers={"X-Gitea-Event": "push"})
|
||||
r2 = client.post("/webhook/gitea", json=body, headers={"X-Gitea-Event": "push"})
|
||||
assert r1.json()["status"] == "accepted"
|
||||
assert r2.json()["status"] == "duplicate"
|
||||
assert _events_count() == 1
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Plane dedup
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
@patch.object(plane_mod, "enqueue_job")
|
||||
@patch("src.webhooks.plane._create_gitea_branch", new_callable=AsyncMock)
|
||||
@patch("src.webhooks.plane._create_initial_docs", new_callable=AsyncMock)
|
||||
def test_plane_fallback_hash_dedup(mock_docs, mock_branch, mock_enqueue):
|
||||
"""Repeated identical Plane body -> first accepted+enqueue, repeat duplicate."""
|
||||
body = {
|
||||
"event": "work_item.created",
|
||||
"data": {
|
||||
"id": "pd-001",
|
||||
"name": "Dedup plane task",
|
||||
"description_stripped": "A sufficiently long description for QG-0 to pass.",
|
||||
"project": "proj-1",
|
||||
},
|
||||
}
|
||||
r1 = client.post("/webhook/plane", json=body)
|
||||
assert r1.status_code == 200
|
||||
assert r1.json()["status"] == "accepted"
|
||||
assert mock_enqueue.call_count == 1
|
||||
assert _events_count() == 1
|
||||
|
||||
r2 = client.post("/webhook/plane", json=body)
|
||||
assert r2.status_code == 200
|
||||
assert r2.json()["status"] == "duplicate"
|
||||
assert mock_enqueue.call_count == 1 # not re-enqueued
|
||||
assert _events_count() == 1
|
||||
|
||||
|
||||
@patch("src.webhooks.plane._create_gitea_branch", new_callable=AsyncMock)
|
||||
@patch("src.webhooks.plane._create_initial_docs", new_callable=AsyncMock)
|
||||
def test_plane_unknown_project_first_delivery_still_ignored(mock_docs, mock_branch):
|
||||
"""ORCH-6 intact: first delivery of an unknown project -> {"status":"ignored"}."""
|
||||
body = {
|
||||
"event": "work_item.created",
|
||||
"data": {"id": "unk-001", "name": "Unknown project task", "project": "proj-UNKNOWN"},
|
||||
}
|
||||
r1 = client.post("/webhook/plane", json=body)
|
||||
assert r1.status_code == 200
|
||||
assert r1.json()["status"] == "ignored"
|
||||
# Event WAS logged (dedup happens before the project filter), so a retry of the
|
||||
# SAME body is a duplicate, not re-evaluated.
|
||||
assert _events_count() == 1
|
||||
r2 = client.post("/webhook/plane", json=body)
|
||||
assert r2.json()["status"] == "duplicate"
|
||||
assert _events_count() == 1
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# HMAC still guarded (acceptance #4) — independent of the dedup path
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def test_gitea_invalid_signature_still_401(monkeypatch):
|
||||
monkeypatch.setattr(gitea_mod.settings, "gitea_webhook_secret", "s3cr3t", raising=False)
|
||||
r = client.post(
|
||||
"/webhook/gitea",
|
||||
json={"ref": "refs/heads/feature/x", "repository": {"name": "enduro-trails"}, "commits": []},
|
||||
headers={"X-Gitea-Event": "push", "X-Gitea-Signature": "deadbeef"},
|
||||
)
|
||||
assert r.status_code == 401
|
||||
|
||||
|
||||
def test_plane_invalid_signature_still_401(monkeypatch):
|
||||
monkeypatch.setattr(plane_mod.settings, "plane_webhook_secret", "s3cr3t", raising=False)
|
||||
r = client.post(
|
||||
"/webhook/plane",
|
||||
json={"event": "work_item.created", "data": {"id": "z", "project": "proj-1"}},
|
||||
headers={"X-Plane-Signature": "deadbeef"},
|
||||
)
|
||||
assert r.status_code == 401
|
||||
Reference in New Issue
Block a user