Files
orchestrator/tests/test_webhook_dedup.py
Dev Agent 09b1c5e1b9 feat(webhook): start pipeline on In Progress status (not on create)
Feature 1. work_item.created no longer starts the pipeline (soft QG-0 log only);
the issue stays in the backlog until moved to In Progress. The pipeline-start body
is extracted into start_pipeline(); a new issue updated handler routes a state
change to In Progress -> handle_status_start, which is idempotent: an existing task
for the plane_id is NOT re-created or restarted (protects handle_comment, which also
flips issues to In Progress). Real Plane payload: event=issue, action=updated,
data.state.id. Existing m6/plane_webhook/dedup tests updated to drive the new
trigger; new test_status_trigger.py covers created-no-op / start / idempotent.
2026-06-03 18:18:26 +03:00

285 lines
11 KiB
Python

"""ORCH-5 (M-7): webhook delivery de-duplication tests.
A retried/replayed webhook delivery must be processed exactly once. We mock
enqueue_job (imported into the gitea/plane module namespaces) and assert its
call_count does not grow on a repeat. HMAC is bypassed here by forcing the
webhook secrets empty (the 9 pre-existing 401 webhook tests are a separate
baseline and are NOT touched). A dedicated test keeps the 401-on-bad-signature
guarantee by re-enabling the secret.
"""
import os
import tempfile
from unittest.mock import patch, AsyncMock
import pytest
# Override DB path + project registry BEFORE importing app (same pattern as
# tests/test_webhooks.py).
_test_db = os.path.join(tempfile.gettempdir(), "test_orchestrator_dedup.db")
os.environ["ORCH_DB_PATH"] = _test_db
os.environ["ORCH_REPOS_DIR"] = tempfile.gettempdir()
os.environ["ORCH_GITEA_TOKEN"] = "test-token"
os.environ["ORCH_PLANE_API_TOKEN"] = "test-token"
os.environ["ORCH_GITEA_OWNER"] = "admin"
os.environ["ORCH_DEFAULT_REPO"] = "enduro-trails"
os.environ["ORCH_PROJECTS_JSON"] = (
'[{"plane_project_id": "proj-1", "repo": "enduro-trails", '
'"work_item_prefix": "ET", "name": "enduro-trails"}]'
)
from fastapi.testclient import TestClient # noqa: E402
from src.main import app # noqa: E402
from src.db import init_db, get_db # noqa: E402
from src import db as db_module # noqa: E402
from src.webhooks import gitea as gitea_mod # noqa: E402
from src.webhooks import plane as plane_mod # noqa: E402
from src import projects as projects_mod # noqa: E402
@pytest.fixture(autouse=True)
def setup_db(monkeypatch):
# settings is a process-wide singleton; another test module may have fixed
# settings.db_path to its own file at import time. get_db() reads it live, so
# pin it to OUR db for the duration of each test here.
monkeypatch.setattr(db_module.settings, "db_path", _test_db, raising=False)
if os.path.exists(_test_db):
os.unlink(_test_db)
init_db()
yield
if os.path.exists(_test_db):
os.unlink(_test_db)
@pytest.fixture(autouse=True)
def proj_registry():
"""Pin the shared project registry to proj-1/enduro-trails.
The registry (projects.PROJECTS / _BY_PLANE_ID) is a process-wide singleton
built at import; test_projects.py rebuilds it via reload_projects(), which can
leave it on the built-in default where proj-1 is unknown -> ORCH-6 would
ignore our fixtures. Force ours for each test, then rebuild after.
"""
os.environ["ORCH_PROJECTS_JSON"] = (
'[{"plane_project_id": "proj-1", "repo": "enduro-trails", '
'"work_item_prefix": "ET", "name": "enduro-trails"}]'
)
projects_mod.settings.projects_json = os.environ["ORCH_PROJECTS_JSON"]
projects_mod.reload_projects()
yield
projects_mod.reload_projects()
@pytest.fixture(autouse=True)
def no_hmac(monkeypatch):
"""Bypass HMAC so dedup behavior (not signing) is under test.
settings is shared, so override the secret on the module-level settings that
each verify_* function reads.
"""
monkeypatch.setattr(gitea_mod.settings, "gitea_webhook_secret", "", raising=False)
monkeypatch.setattr(plane_mod.settings, "plane_webhook_secret", "", raising=False)
yield
client = TestClient(app)
def _events_count():
conn = get_db()
n = conn.execute("SELECT COUNT(*) FROM events").fetchone()[0]
conn.close()
return n
# ---------------------------------------------------------------------------
# Migration
# ---------------------------------------------------------------------------
def test_migration_adds_delivery_id_and_index():
"""events has delivery_id + a partial unique index idx_events_delivery."""
conn = get_db()
cols = [r[1] for r in conn.execute("PRAGMA table_info(events)").fetchall()]
idxs = [r[1] for r in conn.execute("PRAGMA index_list(events)").fetchall()]
conn.close()
assert "delivery_id" in cols
assert "idx_events_delivery" in idxs
def test_migration_on_old_db_without_column_does_not_crash():
"""init_db() over a pre-existing events table WITHOUT delivery_id is safe."""
if os.path.exists(_test_db):
os.unlink(_test_db)
import sqlite3
conn = sqlite3.connect(_test_db)
# Old-shape events table (no delivery_id) + a legacy row with NULL delivery_id.
conn.executescript(
"""
CREATE TABLE events (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp TEXT DEFAULT (datetime('now')),
source TEXT NOT NULL,
event_type TEXT NOT NULL,
payload TEXT NOT NULL,
processed INTEGER DEFAULT 0
);
INSERT INTO events (source, event_type, payload) VALUES ('plane','old','{}');
INSERT INTO events (source, event_type, payload) VALUES ('gitea','old2','{}');
"""
)
conn.commit()
conn.close()
# Should add the column + index without raising and keep the legacy rows.
init_db()
conn = get_db()
cols = [r[1] for r in conn.execute("PRAGMA table_info(events)").fetchall()]
n = conn.execute("SELECT COUNT(*) FROM events").fetchone()[0]
conn.close()
assert "delivery_id" in cols
assert n == 2 # legacy NULL-delivery rows preserved, partial index lets them coexist
# ---------------------------------------------------------------------------
# Gitea dedup
# ---------------------------------------------------------------------------
@patch.object(gitea_mod, "enqueue_job")
def test_gitea_duplicate_delivery_id_skips_dispatch(mock_enqueue):
"""Repeated X-Gitea-Delivery -> first processed, second {"status":"duplicate"}."""
# Task at architecture so the ADR push would enqueue.
conn = get_db()
conn.execute(
"INSERT INTO tasks (plane_id, work_item_id, repo, branch, stage) "
"VALUES (?, ?, ?, ?, ?)",
("gd-001", "ET-100", "enduro-trails", "feature/ET-100-x", "architecture"),
)
conn.commit()
conn.close()
body = {
"ref": "refs/heads/feature/ET-100-x",
"repository": {"name": "enduro-trails"},
"commits": [
{"added": ["docs/work-items/ET-100/06-adr/001-d.md"], "modified": []}
],
}
hdrs = {"X-Gitea-Event": "push", "X-Gitea-Delivery": "guid-AAA"}
r1 = client.post("/webhook/gitea", json=body, headers=hdrs)
assert r1.status_code == 200
assert r1.json()["status"] == "accepted"
assert mock_enqueue.call_count == 1
assert _events_count() == 1
# Same delivery id again -> duplicate, no new enqueue, no new event row.
r2 = client.post("/webhook/gitea", json=body, headers=hdrs)
assert r2.status_code == 200
assert r2.json()["status"] == "duplicate"
assert mock_enqueue.call_count == 1
assert _events_count() == 1
@patch.object(gitea_mod, "enqueue_job")
def test_gitea_two_distinct_delivery_ids_both_processed(mock_enqueue):
body = {"ref": "refs/heads/feature/none", "repository": {"name": "enduro-trails"}, "commits": []}
r1 = client.post("/webhook/gitea", json=body,
headers={"X-Gitea-Event": "push", "X-Gitea-Delivery": "guid-1"})
r2 = client.post("/webhook/gitea", json=body,
headers={"X-Gitea-Event": "push", "X-Gitea-Delivery": "guid-2"})
assert r1.json()["status"] == "accepted"
assert r2.json()["status"] == "accepted"
assert _events_count() == 2
def test_gitea_fallback_hash_when_no_delivery_header():
"""No X-Gitea-Delivery -> sha256 fallback; identical body repeat = duplicate."""
body = {"ref": "refs/heads/feature/none", "repository": {"name": "enduro-trails"}, "commits": []}
r1 = client.post("/webhook/gitea", json=body, headers={"X-Gitea-Event": "push"})
r2 = client.post("/webhook/gitea", json=body, headers={"X-Gitea-Event": "push"})
assert r1.json()["status"] == "accepted"
assert r2.json()["status"] == "duplicate"
assert _events_count() == 1
# ---------------------------------------------------------------------------
# Plane dedup
# ---------------------------------------------------------------------------
@patch.object(plane_mod, "enqueue_job")
@patch("src.webhooks.plane._create_gitea_branch", new_callable=AsyncMock)
@patch("src.webhooks.plane._create_initial_docs", new_callable=AsyncMock)
def test_plane_fallback_hash_dedup(mock_docs, mock_branch, mock_enqueue):
"""Repeated identical Plane body -> first accepted+enqueue, repeat duplicate.
Feature 1: the pipeline now starts on a status change to In Progress, not on
creation, so this drives the dedup test with an 'issue updated' event.
"""
IN_PROGRESS = "b873d9eb-993c-48cd-97ac-99a9b1623967"
body = {
"event": "issue",
"action": "updated",
"data": {
"id": "pd-001",
"name": "Dedup plane task",
"description_stripped": "A sufficiently long description for QG-0 to pass.",
"project": "proj-1",
"state": {"id": IN_PROGRESS, "name": "In Progress", "group": "started"},
},
}
r1 = client.post("/webhook/plane", json=body)
assert r1.status_code == 200
assert r1.json()["status"] == "accepted"
assert mock_enqueue.call_count == 1
assert _events_count() == 1
r2 = client.post("/webhook/plane", json=body)
assert r2.status_code == 200
assert r2.json()["status"] == "duplicate"
assert mock_enqueue.call_count == 1 # not re-enqueued
assert _events_count() == 1
@patch("src.webhooks.plane._create_gitea_branch", new_callable=AsyncMock)
@patch("src.webhooks.plane._create_initial_docs", new_callable=AsyncMock)
def test_plane_unknown_project_first_delivery_still_ignored(mock_docs, mock_branch):
"""ORCH-6 intact: first delivery of an unknown project -> {"status":"ignored"}."""
body = {
"event": "work_item.created",
"data": {"id": "unk-001", "name": "Unknown project task", "project": "proj-UNKNOWN"},
}
r1 = client.post("/webhook/plane", json=body)
assert r1.status_code == 200
assert r1.json()["status"] == "ignored"
# Event WAS logged (dedup happens before the project filter), so a retry of the
# SAME body is a duplicate, not re-evaluated.
assert _events_count() == 1
r2 = client.post("/webhook/plane", json=body)
assert r2.json()["status"] == "duplicate"
assert _events_count() == 1
# ---------------------------------------------------------------------------
# HMAC still guarded (acceptance #4) — independent of the dedup path
# ---------------------------------------------------------------------------
def test_gitea_invalid_signature_still_401(monkeypatch):
monkeypatch.setattr(gitea_mod.settings, "gitea_webhook_secret", "s3cr3t", raising=False)
r = client.post(
"/webhook/gitea",
json={"ref": "refs/heads/feature/x", "repository": {"name": "enduro-trails"}, "commits": []},
headers={"X-Gitea-Event": "push", "X-Gitea-Signature": "deadbeef"},
)
assert r.status_code == 401
def test_plane_invalid_signature_still_401(monkeypatch):
monkeypatch.setattr(plane_mod.settings, "plane_webhook_secret", "s3cr3t", raising=False)
r = client.post(
"/webhook/plane",
json={"event": "work_item.created", "data": {"id": "z", "project": "proj-1"}},
headers={"X-Plane-Signature": "deadbeef"},
)
assert r.status_code == 401