feat(webhook): dedup deliveries by delivery_id (M-7)
This commit is contained in:
52
src/webhooks/_dedup.py
Normal file
52
src/webhooks/_dedup.py
Normal file
@@ -0,0 +1,52 @@
|
||||
"""ORCH-5 (M-7): webhook delivery de-duplication helper.
|
||||
|
||||
Webhook providers (Gitea/Plane) retry deliveries on timeout, network reset, or
|
||||
manual replay. Without idempotency a retried delivery re-enters the pipeline and
|
||||
spawns a duplicate run (the ET-009 incident class: parallel conveyors on one
|
||||
repo). This module computes a stable per-delivery id so the webhook handlers can
|
||||
INSERT-OR-IGNORE into events and skip the dispatch on a repeat.
|
||||
|
||||
delivery_id format: ``f"{source}:{raw_or_hash}"`` where source prefixes
|
||||
gitea/plane so their id-spaces never collide. ``raw`` is the provider's native
|
||||
delivery header (a GUID) when present; otherwise we fall back to a sha256 of the
|
||||
body (a retried identical body yields the same hash).
|
||||
"""
|
||||
|
||||
import hashlib
|
||||
|
||||
|
||||
def _sha256_hex(*parts: str) -> str:
|
||||
h = hashlib.sha256()
|
||||
for p in parts:
|
||||
h.update(p.encode("utf-8", "replace"))
|
||||
return h.hexdigest()
|
||||
|
||||
|
||||
def gitea_delivery_id(headers, event_type: str, body: bytes) -> str:
|
||||
"""Compute the delivery_id for a Gitea webhook.
|
||||
|
||||
Prefers the ``X-Gitea-Delivery`` header (a per-delivery GUID). Falls back to
|
||||
sha256(source + event_type + body) so a retried identical body still maps to
|
||||
one id even if Gitea omitted the header.
|
||||
"""
|
||||
raw = (headers.get("X-Gitea-Delivery") or "").strip()
|
||||
if not raw:
|
||||
raw = _sha256_hex("gitea", event_type or "", body.decode("utf-8", "replace"))
|
||||
return f"gitea:{raw}"
|
||||
|
||||
|
||||
def plane_delivery_id(headers, body: bytes) -> str:
|
||||
"""Compute the delivery_id for a Plane webhook.
|
||||
|
||||
Plane does not reliably send a delivery header, so we try a couple of common
|
||||
names and otherwise fall back to sha256("plane" + body): a retried identical
|
||||
body yields the same id.
|
||||
"""
|
||||
raw = (
|
||||
headers.get("X-Plane-Delivery")
|
||||
or headers.get("X-Hook-Delivery")
|
||||
or ""
|
||||
).strip()
|
||||
if not raw:
|
||||
raw = _sha256_hex("plane", body.decode("utf-8", "replace"))
|
||||
return f"plane:{raw}"
|
||||
@@ -10,7 +10,14 @@ import httpx
|
||||
from fastapi import APIRouter, Request, HTTPException
|
||||
|
||||
from ..config import settings
|
||||
from ..db import get_db, get_task_by_repo_branch, update_task_stage, enqueue_job
|
||||
from ..db import (
|
||||
get_db,
|
||||
get_task_by_repo_branch,
|
||||
update_task_stage,
|
||||
enqueue_job,
|
||||
insert_event_dedup,
|
||||
)
|
||||
from ._dedup import gitea_delivery_id
|
||||
from ..stages import get_next_stage, get_agent_for_stage
|
||||
from ..qg.checks import check_ci_green, check_review_approved
|
||||
from ..notifications import notify_stage_change, notify_qg_failure, notify_error
|
||||
@@ -51,15 +58,17 @@ async def gitea_webhook(request: Request):
|
||||
|
||||
payload = json.loads(body)
|
||||
|
||||
# Log event
|
||||
conn = get_db()
|
||||
# ORCH-5 (M-7): idempotent logging. Compute a stable delivery_id (X-Gitea-Delivery
|
||||
# GUID, or sha256 fallback) and INSERT OR IGNORE. A repeated delivery (Gitea retry
|
||||
# / manual replay) returns inserted=False -> log + return {"status":"duplicate"}
|
||||
# WITHOUT re-dispatching, so the pipeline is not re-triggered (ET-009 class).
|
||||
# Runs AFTER HMAC verification above.
|
||||
event_type = request.headers.get("X-Gitea-Event", "unknown")
|
||||
conn.execute(
|
||||
"INSERT INTO events (source, event_type, payload) VALUES (?, ?, ?)",
|
||||
("gitea", event_type, body.decode()),
|
||||
)
|
||||
conn.commit()
|
||||
conn.close()
|
||||
delivery_id = gitea_delivery_id(request.headers, event_type, body)
|
||||
inserted = insert_event_dedup("gitea", event_type, body.decode(), delivery_id)
|
||||
if not inserted:
|
||||
logger.info(f"Gitea webhook duplicate delivery_id={delivery_id}, skipping dispatch")
|
||||
return {"status": "duplicate"}
|
||||
|
||||
if event_type == "push":
|
||||
await handle_push(payload)
|
||||
|
||||
@@ -15,7 +15,9 @@ from ..db import (
|
||||
get_next_work_item_id,
|
||||
update_task_stage,
|
||||
enqueue_job,
|
||||
insert_event_dedup,
|
||||
)
|
||||
from ._dedup import plane_delivery_id
|
||||
from ..stages import get_next_stage, get_agent_for_stage, get_qg_for_stage, get_previous_stage
|
||||
from ..qg.checks import QG_CHECKS
|
||||
from ..notifications import notify_stage_change, notify_qg_failure, notify_error
|
||||
@@ -61,14 +63,18 @@ async def plane_webhook(request: Request):
|
||||
|
||||
payload = json.loads(body)
|
||||
|
||||
# Log event
|
||||
conn = get_db()
|
||||
conn.execute(
|
||||
"INSERT INTO events (source, event_type, payload) VALUES (?, ?, ?)",
|
||||
("plane", payload.get("event", "unknown"), body.decode()),
|
||||
)
|
||||
conn.commit()
|
||||
conn.close()
|
||||
# ORCH-5 (M-7): idempotent logging. Plane rarely sends a delivery header, so the
|
||||
# delivery_id falls back to sha256("plane" + body) (a retried identical body maps
|
||||
# to one id). INSERT OR IGNORE; a duplicate returns inserted=False -> log + return
|
||||
# {"status":"duplicate"} WITHOUT dispatching. Runs AFTER HMAC and BEFORE the ORCH-6
|
||||
# project filter, so a repeat does no extra work; the FIRST delivery of an unknown
|
||||
# project still falls through to the filter below and returns {"status":"ignored"}.
|
||||
event_type = payload.get("event", "unknown")
|
||||
delivery_id = plane_delivery_id(request.headers, body)
|
||||
inserted = insert_event_dedup("plane", event_type, body.decode(), delivery_id)
|
||||
if not inserted:
|
||||
logger.info(f"Plane webhook duplicate delivery_id={delivery_id}, skipping dispatch")
|
||||
return {"status": "duplicate"}
|
||||
|
||||
event = payload.get("event")
|
||||
action = payload.get("action", "")
|
||||
|
||||
Reference in New Issue
Block a user