From 0b924208dcc7aa3fae897a29d0969445148fb3b2 Mon Sep 17 00:00:00 2001 From: Dev Agent Date: Wed, 3 Jun 2026 09:18:02 +0300 Subject: [PATCH 1/3] feat(db): add events.delivery_id + partial unique index (M-7) --- src/db.py | 38 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 38 insertions(+) diff --git a/src/db.py b/src/db.py index 36590a4..0b77610 100644 --- a/src/db.py +++ b/src/db.py @@ -67,6 +67,17 @@ def init_db(): # (CREATE TABLE IF NOT EXISTS won't add columns to an already-created table). _ensure_column(conn, "jobs", "transient_attempts", "INTEGER NOT NULL DEFAULT 0") _ensure_column(conn, "jobs", "available_at", "TEXT") + # ORCH-5 (M-7): webhook delivery de-dup. Add events.delivery_id and a PARTIAL + # unique index. Partial (WHERE delivery_id IS NOT NULL) so pre-existing rows + # (which have NULL delivery_id) never collide with each other. Restart-safe: + # _ensure_column is a no-op once the column exists, and CREATE INDEX IF NOT + # EXISTS is a no-op once the index exists, so this is safe on the live prod DB. + _ensure_column(conn, "events", "delivery_id", "TEXT") + conn.execute( + "CREATE UNIQUE INDEX IF NOT EXISTS idx_events_delivery " + "ON events(delivery_id) WHERE delivery_id IS NOT NULL" + ) + conn.commit() conn.close() @@ -141,6 +152,33 @@ def get_next_work_item_id(repo: str, prefix: str = "ET") -> str: return f"{prefix}-{next_num:03d}" +# --------------------------------------------------------------------------- +# ORCH-5 (M-7): idempotent webhook event logging +# --------------------------------------------------------------------------- + +def insert_event_dedup( + source: str, event_type: str, payload: str, delivery_id: str +) -> bool: + """Idempotently log a webhook event keyed by delivery_id. + + Returns True if a NEW row was inserted (caller should dispatch the event) and + False if this delivery_id was already present (a duplicate delivery -> caller + must skip dispatch/enqueue). Uses INSERT OR IGNORE against the partial UNIQUE + index idx_events_delivery; rowcount==1 means the row was actually inserted. + """ + conn = get_db() + try: + cur = conn.execute( + "INSERT OR IGNORE INTO events (source, event_type, payload, delivery_id) " + "VALUES (?, ?, ?, ?)", + (source, event_type, payload, delivery_id), + ) + conn.commit() + return cur.rowcount == 1 + finally: + conn.close() + + # --------------------------------------------------------------------------- # ORCH-1 (F-2b): job queue helpers # --------------------------------------------------------------------------- -- 2.49.1 From e6a7c6de8d65a2001905989c010e41a0fe23e7b9 Mon Sep 17 00:00:00 2001 From: Dev Agent Date: Wed, 3 Jun 2026 09:18:02 +0300 Subject: [PATCH 2/3] feat(webhook): dedup deliveries by delivery_id (M-7) --- src/webhooks/_dedup.py | 52 ++++++++++++++++++++++++++++++++++++++++++ src/webhooks/gitea.py | 27 ++++++++++++++-------- src/webhooks/plane.py | 22 +++++++++++------- 3 files changed, 84 insertions(+), 17 deletions(-) create mode 100644 src/webhooks/_dedup.py diff --git a/src/webhooks/_dedup.py b/src/webhooks/_dedup.py new file mode 100644 index 0000000..3efd8db --- /dev/null +++ b/src/webhooks/_dedup.py @@ -0,0 +1,52 @@ +"""ORCH-5 (M-7): webhook delivery de-duplication helper. + +Webhook providers (Gitea/Plane) retry deliveries on timeout, network reset, or +manual replay. Without idempotency a retried delivery re-enters the pipeline and +spawns a duplicate run (the ET-009 incident class: parallel conveyors on one +repo). This module computes a stable per-delivery id so the webhook handlers can +INSERT-OR-IGNORE into events and skip the dispatch on a repeat. + +delivery_id format: ``f"{source}:{raw_or_hash}"`` where source prefixes +gitea/plane so their id-spaces never collide. ``raw`` is the provider's native +delivery header (a GUID) when present; otherwise we fall back to a sha256 of the +body (a retried identical body yields the same hash). +""" + +import hashlib + + +def _sha256_hex(*parts: str) -> str: + h = hashlib.sha256() + for p in parts: + h.update(p.encode("utf-8", "replace")) + return h.hexdigest() + + +def gitea_delivery_id(headers, event_type: str, body: bytes) -> str: + """Compute the delivery_id for a Gitea webhook. + + Prefers the ``X-Gitea-Delivery`` header (a per-delivery GUID). Falls back to + sha256(source + event_type + body) so a retried identical body still maps to + one id even if Gitea omitted the header. + """ + raw = (headers.get("X-Gitea-Delivery") or "").strip() + if not raw: + raw = _sha256_hex("gitea", event_type or "", body.decode("utf-8", "replace")) + return f"gitea:{raw}" + + +def plane_delivery_id(headers, body: bytes) -> str: + """Compute the delivery_id for a Plane webhook. + + Plane does not reliably send a delivery header, so we try a couple of common + names and otherwise fall back to sha256("plane" + body): a retried identical + body yields the same id. + """ + raw = ( + headers.get("X-Plane-Delivery") + or headers.get("X-Hook-Delivery") + or "" + ).strip() + if not raw: + raw = _sha256_hex("plane", body.decode("utf-8", "replace")) + return f"plane:{raw}" diff --git a/src/webhooks/gitea.py b/src/webhooks/gitea.py index f6cd58a..0957294 100644 --- a/src/webhooks/gitea.py +++ b/src/webhooks/gitea.py @@ -10,7 +10,14 @@ import httpx from fastapi import APIRouter, Request, HTTPException from ..config import settings -from ..db import get_db, get_task_by_repo_branch, update_task_stage, enqueue_job +from ..db import ( + get_db, + get_task_by_repo_branch, + update_task_stage, + enqueue_job, + insert_event_dedup, +) +from ._dedup import gitea_delivery_id from ..stages import get_next_stage, get_agent_for_stage from ..qg.checks import check_ci_green, check_review_approved from ..notifications import notify_stage_change, notify_qg_failure, notify_error @@ -51,15 +58,17 @@ async def gitea_webhook(request: Request): payload = json.loads(body) - # Log event - conn = get_db() + # ORCH-5 (M-7): idempotent logging. Compute a stable delivery_id (X-Gitea-Delivery + # GUID, or sha256 fallback) and INSERT OR IGNORE. A repeated delivery (Gitea retry + # / manual replay) returns inserted=False -> log + return {"status":"duplicate"} + # WITHOUT re-dispatching, so the pipeline is not re-triggered (ET-009 class). + # Runs AFTER HMAC verification above. event_type = request.headers.get("X-Gitea-Event", "unknown") - conn.execute( - "INSERT INTO events (source, event_type, payload) VALUES (?, ?, ?)", - ("gitea", event_type, body.decode()), - ) - conn.commit() - conn.close() + delivery_id = gitea_delivery_id(request.headers, event_type, body) + inserted = insert_event_dedup("gitea", event_type, body.decode(), delivery_id) + if not inserted: + logger.info(f"Gitea webhook duplicate delivery_id={delivery_id}, skipping dispatch") + return {"status": "duplicate"} if event_type == "push": await handle_push(payload) diff --git a/src/webhooks/plane.py b/src/webhooks/plane.py index e7ba716..f6d18e7 100644 --- a/src/webhooks/plane.py +++ b/src/webhooks/plane.py @@ -15,7 +15,9 @@ from ..db import ( get_next_work_item_id, update_task_stage, enqueue_job, + insert_event_dedup, ) +from ._dedup import plane_delivery_id from ..stages import get_next_stage, get_agent_for_stage, get_qg_for_stage, get_previous_stage from ..qg.checks import QG_CHECKS from ..notifications import notify_stage_change, notify_qg_failure, notify_error @@ -61,14 +63,18 @@ async def plane_webhook(request: Request): payload = json.loads(body) - # Log event - conn = get_db() - conn.execute( - "INSERT INTO events (source, event_type, payload) VALUES (?, ?, ?)", - ("plane", payload.get("event", "unknown"), body.decode()), - ) - conn.commit() - conn.close() + # ORCH-5 (M-7): idempotent logging. Plane rarely sends a delivery header, so the + # delivery_id falls back to sha256("plane" + body) (a retried identical body maps + # to one id). INSERT OR IGNORE; a duplicate returns inserted=False -> log + return + # {"status":"duplicate"} WITHOUT dispatching. Runs AFTER HMAC and BEFORE the ORCH-6 + # project filter, so a repeat does no extra work; the FIRST delivery of an unknown + # project still falls through to the filter below and returns {"status":"ignored"}. + event_type = payload.get("event", "unknown") + delivery_id = plane_delivery_id(request.headers, body) + inserted = insert_event_dedup("plane", event_type, body.decode(), delivery_id) + if not inserted: + logger.info(f"Plane webhook duplicate delivery_id={delivery_id}, skipping dispatch") + return {"status": "duplicate"} event = payload.get("event") action = payload.get("action", "") -- 2.49.1 From 4ac449ff63ab22279a59130e57ef2a906f328340 Mon Sep 17 00:00:00 2001 From: Dev Agent Date: Wed, 3 Jun 2026 09:18:02 +0300 Subject: [PATCH 3/3] test(webhook): cover delivery dedup + migration safety (M-7) --- tests/test_webhook_dedup.py | 277 ++++++++++++++++++++++++++++++++++++ 1 file changed, 277 insertions(+) create mode 100644 tests/test_webhook_dedup.py diff --git a/tests/test_webhook_dedup.py b/tests/test_webhook_dedup.py new file mode 100644 index 0000000..94f79e4 --- /dev/null +++ b/tests/test_webhook_dedup.py @@ -0,0 +1,277 @@ +"""ORCH-5 (M-7): webhook delivery de-duplication tests. + +A retried/replayed webhook delivery must be processed exactly once. We mock +enqueue_job (imported into the gitea/plane module namespaces) and assert its +call_count does not grow on a repeat. HMAC is bypassed here by forcing the +webhook secrets empty (the 9 pre-existing 401 webhook tests are a separate +baseline and are NOT touched). A dedicated test keeps the 401-on-bad-signature +guarantee by re-enabling the secret. +""" + +import os +import tempfile +from unittest.mock import patch, AsyncMock + +import pytest + +# Override DB path + project registry BEFORE importing app (same pattern as +# tests/test_webhooks.py). +_test_db = os.path.join(tempfile.gettempdir(), "test_orchestrator_dedup.db") +os.environ["ORCH_DB_PATH"] = _test_db +os.environ["ORCH_REPOS_DIR"] = tempfile.gettempdir() +os.environ["ORCH_GITEA_TOKEN"] = "test-token" +os.environ["ORCH_PLANE_API_TOKEN"] = "test-token" +os.environ["ORCH_GITEA_OWNER"] = "admin" +os.environ["ORCH_DEFAULT_REPO"] = "enduro-trails" +os.environ["ORCH_PROJECTS_JSON"] = ( + '[{"plane_project_id": "proj-1", "repo": "enduro-trails", ' + '"work_item_prefix": "ET", "name": "enduro-trails"}]' +) + +from fastapi.testclient import TestClient # noqa: E402 +from src.main import app # noqa: E402 +from src.db import init_db, get_db # noqa: E402 +from src import db as db_module # noqa: E402 +from src.webhooks import gitea as gitea_mod # noqa: E402 +from src.webhooks import plane as plane_mod # noqa: E402 +from src import projects as projects_mod # noqa: E402 + + +@pytest.fixture(autouse=True) +def setup_db(monkeypatch): + # settings is a process-wide singleton; another test module may have fixed + # settings.db_path to its own file at import time. get_db() reads it live, so + # pin it to OUR db for the duration of each test here. + monkeypatch.setattr(db_module.settings, "db_path", _test_db, raising=False) + if os.path.exists(_test_db): + os.unlink(_test_db) + init_db() + yield + if os.path.exists(_test_db): + os.unlink(_test_db) + + +@pytest.fixture(autouse=True) +def proj_registry(): + """Pin the shared project registry to proj-1/enduro-trails. + + The registry (projects.PROJECTS / _BY_PLANE_ID) is a process-wide singleton + built at import; test_projects.py rebuilds it via reload_projects(), which can + leave it on the built-in default where proj-1 is unknown -> ORCH-6 would + ignore our fixtures. Force ours for each test, then rebuild after. + """ + os.environ["ORCH_PROJECTS_JSON"] = ( + '[{"plane_project_id": "proj-1", "repo": "enduro-trails", ' + '"work_item_prefix": "ET", "name": "enduro-trails"}]' + ) + projects_mod.settings.projects_json = os.environ["ORCH_PROJECTS_JSON"] + projects_mod.reload_projects() + yield + projects_mod.reload_projects() + + +@pytest.fixture(autouse=True) +def no_hmac(monkeypatch): + """Bypass HMAC so dedup behavior (not signing) is under test. + + settings is shared, so override the secret on the module-level settings that + each verify_* function reads. + """ + monkeypatch.setattr(gitea_mod.settings, "gitea_webhook_secret", "", raising=False) + monkeypatch.setattr(plane_mod.settings, "plane_webhook_secret", "", raising=False) + yield + + +client = TestClient(app) + + +def _events_count(): + conn = get_db() + n = conn.execute("SELECT COUNT(*) FROM events").fetchone()[0] + conn.close() + return n + + +# --------------------------------------------------------------------------- +# Migration +# --------------------------------------------------------------------------- + +def test_migration_adds_delivery_id_and_index(): + """events has delivery_id + a partial unique index idx_events_delivery.""" + conn = get_db() + cols = [r[1] for r in conn.execute("PRAGMA table_info(events)").fetchall()] + idxs = [r[1] for r in conn.execute("PRAGMA index_list(events)").fetchall()] + conn.close() + assert "delivery_id" in cols + assert "idx_events_delivery" in idxs + + +def test_migration_on_old_db_without_column_does_not_crash(): + """init_db() over a pre-existing events table WITHOUT delivery_id is safe.""" + if os.path.exists(_test_db): + os.unlink(_test_db) + import sqlite3 + conn = sqlite3.connect(_test_db) + # Old-shape events table (no delivery_id) + a legacy row with NULL delivery_id. + conn.executescript( + """ + CREATE TABLE events ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + timestamp TEXT DEFAULT (datetime('now')), + source TEXT NOT NULL, + event_type TEXT NOT NULL, + payload TEXT NOT NULL, + processed INTEGER DEFAULT 0 + ); + INSERT INTO events (source, event_type, payload) VALUES ('plane','old','{}'); + INSERT INTO events (source, event_type, payload) VALUES ('gitea','old2','{}'); + """ + ) + conn.commit() + conn.close() + + # Should add the column + index without raising and keep the legacy rows. + init_db() + + conn = get_db() + cols = [r[1] for r in conn.execute("PRAGMA table_info(events)").fetchall()] + n = conn.execute("SELECT COUNT(*) FROM events").fetchone()[0] + conn.close() + assert "delivery_id" in cols + assert n == 2 # legacy NULL-delivery rows preserved, partial index lets them coexist + + +# --------------------------------------------------------------------------- +# Gitea dedup +# --------------------------------------------------------------------------- + +@patch.object(gitea_mod, "enqueue_job") +def test_gitea_duplicate_delivery_id_skips_dispatch(mock_enqueue): + """Repeated X-Gitea-Delivery -> first processed, second {"status":"duplicate"}.""" + # Task at architecture so the ADR push would enqueue. + conn = get_db() + conn.execute( + "INSERT INTO tasks (plane_id, work_item_id, repo, branch, stage) " + "VALUES (?, ?, ?, ?, ?)", + ("gd-001", "ET-100", "enduro-trails", "feature/ET-100-x", "architecture"), + ) + conn.commit() + conn.close() + + body = { + "ref": "refs/heads/feature/ET-100-x", + "repository": {"name": "enduro-trails"}, + "commits": [ + {"added": ["docs/work-items/ET-100/06-adr/001-d.md"], "modified": []} + ], + } + hdrs = {"X-Gitea-Event": "push", "X-Gitea-Delivery": "guid-AAA"} + + r1 = client.post("/webhook/gitea", json=body, headers=hdrs) + assert r1.status_code == 200 + assert r1.json()["status"] == "accepted" + assert mock_enqueue.call_count == 1 + assert _events_count() == 1 + + # Same delivery id again -> duplicate, no new enqueue, no new event row. + r2 = client.post("/webhook/gitea", json=body, headers=hdrs) + assert r2.status_code == 200 + assert r2.json()["status"] == "duplicate" + assert mock_enqueue.call_count == 1 + assert _events_count() == 1 + + +@patch.object(gitea_mod, "enqueue_job") +def test_gitea_two_distinct_delivery_ids_both_processed(mock_enqueue): + body = {"ref": "refs/heads/feature/none", "repository": {"name": "enduro-trails"}, "commits": []} + r1 = client.post("/webhook/gitea", json=body, + headers={"X-Gitea-Event": "push", "X-Gitea-Delivery": "guid-1"}) + r2 = client.post("/webhook/gitea", json=body, + headers={"X-Gitea-Event": "push", "X-Gitea-Delivery": "guid-2"}) + assert r1.json()["status"] == "accepted" + assert r2.json()["status"] == "accepted" + assert _events_count() == 2 + + +def test_gitea_fallback_hash_when_no_delivery_header(): + """No X-Gitea-Delivery -> sha256 fallback; identical body repeat = duplicate.""" + body = {"ref": "refs/heads/feature/none", "repository": {"name": "enduro-trails"}, "commits": []} + r1 = client.post("/webhook/gitea", json=body, headers={"X-Gitea-Event": "push"}) + r2 = client.post("/webhook/gitea", json=body, headers={"X-Gitea-Event": "push"}) + assert r1.json()["status"] == "accepted" + assert r2.json()["status"] == "duplicate" + assert _events_count() == 1 + + +# --------------------------------------------------------------------------- +# Plane dedup +# --------------------------------------------------------------------------- + +@patch.object(plane_mod, "enqueue_job") +@patch("src.webhooks.plane._create_gitea_branch", new_callable=AsyncMock) +@patch("src.webhooks.plane._create_initial_docs", new_callable=AsyncMock) +def test_plane_fallback_hash_dedup(mock_docs, mock_branch, mock_enqueue): + """Repeated identical Plane body -> first accepted+enqueue, repeat duplicate.""" + body = { + "event": "work_item.created", + "data": { + "id": "pd-001", + "name": "Dedup plane task", + "description_stripped": "A sufficiently long description for QG-0 to pass.", + "project": "proj-1", + }, + } + r1 = client.post("/webhook/plane", json=body) + assert r1.status_code == 200 + assert r1.json()["status"] == "accepted" + assert mock_enqueue.call_count == 1 + assert _events_count() == 1 + + r2 = client.post("/webhook/plane", json=body) + assert r2.status_code == 200 + assert r2.json()["status"] == "duplicate" + assert mock_enqueue.call_count == 1 # not re-enqueued + assert _events_count() == 1 + + +@patch("src.webhooks.plane._create_gitea_branch", new_callable=AsyncMock) +@patch("src.webhooks.plane._create_initial_docs", new_callable=AsyncMock) +def test_plane_unknown_project_first_delivery_still_ignored(mock_docs, mock_branch): + """ORCH-6 intact: first delivery of an unknown project -> {"status":"ignored"}.""" + body = { + "event": "work_item.created", + "data": {"id": "unk-001", "name": "Unknown project task", "project": "proj-UNKNOWN"}, + } + r1 = client.post("/webhook/plane", json=body) + assert r1.status_code == 200 + assert r1.json()["status"] == "ignored" + # Event WAS logged (dedup happens before the project filter), so a retry of the + # SAME body is a duplicate, not re-evaluated. + assert _events_count() == 1 + r2 = client.post("/webhook/plane", json=body) + assert r2.json()["status"] == "duplicate" + assert _events_count() == 1 + + +# --------------------------------------------------------------------------- +# HMAC still guarded (acceptance #4) — independent of the dedup path +# --------------------------------------------------------------------------- + +def test_gitea_invalid_signature_still_401(monkeypatch): + monkeypatch.setattr(gitea_mod.settings, "gitea_webhook_secret", "s3cr3t", raising=False) + r = client.post( + "/webhook/gitea", + json={"ref": "refs/heads/feature/x", "repository": {"name": "enduro-trails"}, "commits": []}, + headers={"X-Gitea-Event": "push", "X-Gitea-Signature": "deadbeef"}, + ) + assert r.status_code == 401 + + +def test_plane_invalid_signature_still_401(monkeypatch): + monkeypatch.setattr(plane_mod.settings, "plane_webhook_secret", "s3cr3t", raising=False) + r = client.post( + "/webhook/plane", + json={"event": "work_item.created", "data": {"id": "z", "project": "proj-1"}}, + headers={"X-Plane-Signature": "deadbeef"}, + ) + assert r.status_code == 401 -- 2.49.1