From e6a7c6de8d65a2001905989c010e41a0fe23e7b9 Mon Sep 17 00:00:00 2001 From: Dev Agent Date: Wed, 3 Jun 2026 09:18:02 +0300 Subject: [PATCH] feat(webhook): dedup deliveries by delivery_id (M-7) --- src/webhooks/_dedup.py | 52 ++++++++++++++++++++++++++++++++++++++++++ src/webhooks/gitea.py | 27 ++++++++++++++-------- src/webhooks/plane.py | 22 +++++++++++------- 3 files changed, 84 insertions(+), 17 deletions(-) create mode 100644 src/webhooks/_dedup.py diff --git a/src/webhooks/_dedup.py b/src/webhooks/_dedup.py new file mode 100644 index 0000000..3efd8db --- /dev/null +++ b/src/webhooks/_dedup.py @@ -0,0 +1,52 @@ +"""ORCH-5 (M-7): webhook delivery de-duplication helper. + +Webhook providers (Gitea/Plane) retry deliveries on timeout, network reset, or +manual replay. Without idempotency a retried delivery re-enters the pipeline and +spawns a duplicate run (the ET-009 incident class: parallel conveyors on one +repo). This module computes a stable per-delivery id so the webhook handlers can +INSERT-OR-IGNORE into events and skip the dispatch on a repeat. + +delivery_id format: ``f"{source}:{raw_or_hash}"`` where source prefixes +gitea/plane so their id-spaces never collide. ``raw`` is the provider's native +delivery header (a GUID) when present; otherwise we fall back to a sha256 of the +body (a retried identical body yields the same hash). +""" + +import hashlib + + +def _sha256_hex(*parts: str) -> str: + h = hashlib.sha256() + for p in parts: + h.update(p.encode("utf-8", "replace")) + return h.hexdigest() + + +def gitea_delivery_id(headers, event_type: str, body: bytes) -> str: + """Compute the delivery_id for a Gitea webhook. + + Prefers the ``X-Gitea-Delivery`` header (a per-delivery GUID). Falls back to + sha256(source + event_type + body) so a retried identical body still maps to + one id even if Gitea omitted the header. + """ + raw = (headers.get("X-Gitea-Delivery") or "").strip() + if not raw: + raw = _sha256_hex("gitea", event_type or "", body.decode("utf-8", "replace")) + return f"gitea:{raw}" + + +def plane_delivery_id(headers, body: bytes) -> str: + """Compute the delivery_id for a Plane webhook. + + Plane does not reliably send a delivery header, so we try a couple of common + names and otherwise fall back to sha256("plane" + body): a retried identical + body yields the same id. + """ + raw = ( + headers.get("X-Plane-Delivery") + or headers.get("X-Hook-Delivery") + or "" + ).strip() + if not raw: + raw = _sha256_hex("plane", body.decode("utf-8", "replace")) + return f"plane:{raw}" diff --git a/src/webhooks/gitea.py b/src/webhooks/gitea.py index f6cd58a..0957294 100644 --- a/src/webhooks/gitea.py +++ b/src/webhooks/gitea.py @@ -10,7 +10,14 @@ import httpx from fastapi import APIRouter, Request, HTTPException from ..config import settings -from ..db import get_db, get_task_by_repo_branch, update_task_stage, enqueue_job +from ..db import ( + get_db, + get_task_by_repo_branch, + update_task_stage, + enqueue_job, + insert_event_dedup, +) +from ._dedup import gitea_delivery_id from ..stages import get_next_stage, get_agent_for_stage from ..qg.checks import check_ci_green, check_review_approved from ..notifications import notify_stage_change, notify_qg_failure, notify_error @@ -51,15 +58,17 @@ async def gitea_webhook(request: Request): payload = json.loads(body) - # Log event - conn = get_db() + # ORCH-5 (M-7): idempotent logging. Compute a stable delivery_id (X-Gitea-Delivery + # GUID, or sha256 fallback) and INSERT OR IGNORE. A repeated delivery (Gitea retry + # / manual replay) returns inserted=False -> log + return {"status":"duplicate"} + # WITHOUT re-dispatching, so the pipeline is not re-triggered (ET-009 class). + # Runs AFTER HMAC verification above. event_type = request.headers.get("X-Gitea-Event", "unknown") - conn.execute( - "INSERT INTO events (source, event_type, payload) VALUES (?, ?, ?)", - ("gitea", event_type, body.decode()), - ) - conn.commit() - conn.close() + delivery_id = gitea_delivery_id(request.headers, event_type, body) + inserted = insert_event_dedup("gitea", event_type, body.decode(), delivery_id) + if not inserted: + logger.info(f"Gitea webhook duplicate delivery_id={delivery_id}, skipping dispatch") + return {"status": "duplicate"} if event_type == "push": await handle_push(payload) diff --git a/src/webhooks/plane.py b/src/webhooks/plane.py index e7ba716..f6d18e7 100644 --- a/src/webhooks/plane.py +++ b/src/webhooks/plane.py @@ -15,7 +15,9 @@ from ..db import ( get_next_work_item_id, update_task_stage, enqueue_job, + insert_event_dedup, ) +from ._dedup import plane_delivery_id from ..stages import get_next_stage, get_agent_for_stage, get_qg_for_stage, get_previous_stage from ..qg.checks import QG_CHECKS from ..notifications import notify_stage_change, notify_qg_failure, notify_error @@ -61,14 +63,18 @@ async def plane_webhook(request: Request): payload = json.loads(body) - # Log event - conn = get_db() - conn.execute( - "INSERT INTO events (source, event_type, payload) VALUES (?, ?, ?)", - ("plane", payload.get("event", "unknown"), body.decode()), - ) - conn.commit() - conn.close() + # ORCH-5 (M-7): idempotent logging. Plane rarely sends a delivery header, so the + # delivery_id falls back to sha256("plane" + body) (a retried identical body maps + # to one id). INSERT OR IGNORE; a duplicate returns inserted=False -> log + return + # {"status":"duplicate"} WITHOUT dispatching. Runs AFTER HMAC and BEFORE the ORCH-6 + # project filter, so a repeat does no extra work; the FIRST delivery of an unknown + # project still falls through to the filter below and returns {"status":"ignored"}. + event_type = payload.get("event", "unknown") + delivery_id = plane_delivery_id(request.headers, body) + inserted = insert_event_dedup("plane", event_type, body.decode(), delivery_id) + if not inserted: + logger.info(f"Plane webhook duplicate delivery_id={delivery_id}, skipping dispatch") + return {"status": "duplicate"} event = payload.get("event") action = payload.get("action", "")