Feature 1. work_item.created no longer starts the pipeline (soft QG-0 log only); the issue stays in the backlog until moved to In Progress. The pipeline-start body is extracted into start_pipeline(); a new issue updated handler routes a state change to In Progress -> handle_status_start, which is idempotent: an existing task for the plane_id is NOT re-created or restarted (protects handle_comment, which also flips issues to In Progress). Real Plane payload: event=issue, action=updated, data.state.id. Existing m6/plane_webhook/dedup tests updated to drive the new trigger; new test_status_trigger.py covers created-no-op / start / idempotent.
285 lines
11 KiB
Python
285 lines
11 KiB
Python
"""ORCH-5 (M-7): webhook delivery de-duplication tests.
|
|
|
|
A retried/replayed webhook delivery must be processed exactly once. We mock
|
|
enqueue_job (imported into the gitea/plane module namespaces) and assert its
|
|
call_count does not grow on a repeat. HMAC is bypassed here by forcing the
|
|
webhook secrets empty (the 9 pre-existing 401 webhook tests are a separate
|
|
baseline and are NOT touched). A dedicated test keeps the 401-on-bad-signature
|
|
guarantee by re-enabling the secret.
|
|
"""
|
|
|
|
import os
|
|
import tempfile
|
|
from unittest.mock import patch, AsyncMock
|
|
|
|
import pytest
|
|
|
|
# Override DB path + project registry BEFORE importing app (same pattern as
|
|
# tests/test_webhooks.py).
|
|
_test_db = os.path.join(tempfile.gettempdir(), "test_orchestrator_dedup.db")
|
|
os.environ["ORCH_DB_PATH"] = _test_db
|
|
os.environ["ORCH_REPOS_DIR"] = tempfile.gettempdir()
|
|
os.environ["ORCH_GITEA_TOKEN"] = "test-token"
|
|
os.environ["ORCH_PLANE_API_TOKEN"] = "test-token"
|
|
os.environ["ORCH_GITEA_OWNER"] = "admin"
|
|
os.environ["ORCH_DEFAULT_REPO"] = "enduro-trails"
|
|
os.environ["ORCH_PROJECTS_JSON"] = (
|
|
'[{"plane_project_id": "proj-1", "repo": "enduro-trails", '
|
|
'"work_item_prefix": "ET", "name": "enduro-trails"}]'
|
|
)
|
|
|
|
from fastapi.testclient import TestClient # noqa: E402
|
|
from src.main import app # noqa: E402
|
|
from src.db import init_db, get_db # noqa: E402
|
|
from src import db as db_module # noqa: E402
|
|
from src.webhooks import gitea as gitea_mod # noqa: E402
|
|
from src.webhooks import plane as plane_mod # noqa: E402
|
|
from src import projects as projects_mod # noqa: E402
|
|
|
|
|
|
@pytest.fixture(autouse=True)
|
|
def setup_db(monkeypatch):
|
|
# settings is a process-wide singleton; another test module may have fixed
|
|
# settings.db_path to its own file at import time. get_db() reads it live, so
|
|
# pin it to OUR db for the duration of each test here.
|
|
monkeypatch.setattr(db_module.settings, "db_path", _test_db, raising=False)
|
|
if os.path.exists(_test_db):
|
|
os.unlink(_test_db)
|
|
init_db()
|
|
yield
|
|
if os.path.exists(_test_db):
|
|
os.unlink(_test_db)
|
|
|
|
|
|
@pytest.fixture(autouse=True)
|
|
def proj_registry():
|
|
"""Pin the shared project registry to proj-1/enduro-trails.
|
|
|
|
The registry (projects.PROJECTS / _BY_PLANE_ID) is a process-wide singleton
|
|
built at import; test_projects.py rebuilds it via reload_projects(), which can
|
|
leave it on the built-in default where proj-1 is unknown -> ORCH-6 would
|
|
ignore our fixtures. Force ours for each test, then rebuild after.
|
|
"""
|
|
os.environ["ORCH_PROJECTS_JSON"] = (
|
|
'[{"plane_project_id": "proj-1", "repo": "enduro-trails", '
|
|
'"work_item_prefix": "ET", "name": "enduro-trails"}]'
|
|
)
|
|
projects_mod.settings.projects_json = os.environ["ORCH_PROJECTS_JSON"]
|
|
projects_mod.reload_projects()
|
|
yield
|
|
projects_mod.reload_projects()
|
|
|
|
|
|
@pytest.fixture(autouse=True)
|
|
def no_hmac(monkeypatch):
|
|
"""Bypass HMAC so dedup behavior (not signing) is under test.
|
|
|
|
settings is shared, so override the secret on the module-level settings that
|
|
each verify_* function reads.
|
|
"""
|
|
monkeypatch.setattr(gitea_mod.settings, "gitea_webhook_secret", "", raising=False)
|
|
monkeypatch.setattr(plane_mod.settings, "plane_webhook_secret", "", raising=False)
|
|
yield
|
|
|
|
|
|
client = TestClient(app)
|
|
|
|
|
|
def _events_count():
|
|
conn = get_db()
|
|
n = conn.execute("SELECT COUNT(*) FROM events").fetchone()[0]
|
|
conn.close()
|
|
return n
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Migration
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def test_migration_adds_delivery_id_and_index():
|
|
"""events has delivery_id + a partial unique index idx_events_delivery."""
|
|
conn = get_db()
|
|
cols = [r[1] for r in conn.execute("PRAGMA table_info(events)").fetchall()]
|
|
idxs = [r[1] for r in conn.execute("PRAGMA index_list(events)").fetchall()]
|
|
conn.close()
|
|
assert "delivery_id" in cols
|
|
assert "idx_events_delivery" in idxs
|
|
|
|
|
|
def test_migration_on_old_db_without_column_does_not_crash():
|
|
"""init_db() over a pre-existing events table WITHOUT delivery_id is safe."""
|
|
if os.path.exists(_test_db):
|
|
os.unlink(_test_db)
|
|
import sqlite3
|
|
conn = sqlite3.connect(_test_db)
|
|
# Old-shape events table (no delivery_id) + a legacy row with NULL delivery_id.
|
|
conn.executescript(
|
|
"""
|
|
CREATE TABLE events (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
timestamp TEXT DEFAULT (datetime('now')),
|
|
source TEXT NOT NULL,
|
|
event_type TEXT NOT NULL,
|
|
payload TEXT NOT NULL,
|
|
processed INTEGER DEFAULT 0
|
|
);
|
|
INSERT INTO events (source, event_type, payload) VALUES ('plane','old','{}');
|
|
INSERT INTO events (source, event_type, payload) VALUES ('gitea','old2','{}');
|
|
"""
|
|
)
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
# Should add the column + index without raising and keep the legacy rows.
|
|
init_db()
|
|
|
|
conn = get_db()
|
|
cols = [r[1] for r in conn.execute("PRAGMA table_info(events)").fetchall()]
|
|
n = conn.execute("SELECT COUNT(*) FROM events").fetchone()[0]
|
|
conn.close()
|
|
assert "delivery_id" in cols
|
|
assert n == 2 # legacy NULL-delivery rows preserved, partial index lets them coexist
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Gitea dedup
|
|
# ---------------------------------------------------------------------------
|
|
|
|
@patch.object(gitea_mod, "enqueue_job")
|
|
def test_gitea_duplicate_delivery_id_skips_dispatch(mock_enqueue):
|
|
"""Repeated X-Gitea-Delivery -> first processed, second {"status":"duplicate"}."""
|
|
# Task at architecture so the ADR push would enqueue.
|
|
conn = get_db()
|
|
conn.execute(
|
|
"INSERT INTO tasks (plane_id, work_item_id, repo, branch, stage) "
|
|
"VALUES (?, ?, ?, ?, ?)",
|
|
("gd-001", "ET-100", "enduro-trails", "feature/ET-100-x", "architecture"),
|
|
)
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
body = {
|
|
"ref": "refs/heads/feature/ET-100-x",
|
|
"repository": {"name": "enduro-trails"},
|
|
"commits": [
|
|
{"added": ["docs/work-items/ET-100/06-adr/001-d.md"], "modified": []}
|
|
],
|
|
}
|
|
hdrs = {"X-Gitea-Event": "push", "X-Gitea-Delivery": "guid-AAA"}
|
|
|
|
r1 = client.post("/webhook/gitea", json=body, headers=hdrs)
|
|
assert r1.status_code == 200
|
|
assert r1.json()["status"] == "accepted"
|
|
assert mock_enqueue.call_count == 1
|
|
assert _events_count() == 1
|
|
|
|
# Same delivery id again -> duplicate, no new enqueue, no new event row.
|
|
r2 = client.post("/webhook/gitea", json=body, headers=hdrs)
|
|
assert r2.status_code == 200
|
|
assert r2.json()["status"] == "duplicate"
|
|
assert mock_enqueue.call_count == 1
|
|
assert _events_count() == 1
|
|
|
|
|
|
@patch.object(gitea_mod, "enqueue_job")
|
|
def test_gitea_two_distinct_delivery_ids_both_processed(mock_enqueue):
|
|
body = {"ref": "refs/heads/feature/none", "repository": {"name": "enduro-trails"}, "commits": []}
|
|
r1 = client.post("/webhook/gitea", json=body,
|
|
headers={"X-Gitea-Event": "push", "X-Gitea-Delivery": "guid-1"})
|
|
r2 = client.post("/webhook/gitea", json=body,
|
|
headers={"X-Gitea-Event": "push", "X-Gitea-Delivery": "guid-2"})
|
|
assert r1.json()["status"] == "accepted"
|
|
assert r2.json()["status"] == "accepted"
|
|
assert _events_count() == 2
|
|
|
|
|
|
def test_gitea_fallback_hash_when_no_delivery_header():
|
|
"""No X-Gitea-Delivery -> sha256 fallback; identical body repeat = duplicate."""
|
|
body = {"ref": "refs/heads/feature/none", "repository": {"name": "enduro-trails"}, "commits": []}
|
|
r1 = client.post("/webhook/gitea", json=body, headers={"X-Gitea-Event": "push"})
|
|
r2 = client.post("/webhook/gitea", json=body, headers={"X-Gitea-Event": "push"})
|
|
assert r1.json()["status"] == "accepted"
|
|
assert r2.json()["status"] == "duplicate"
|
|
assert _events_count() == 1
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Plane dedup
|
|
# ---------------------------------------------------------------------------
|
|
|
|
@patch.object(plane_mod, "enqueue_job")
|
|
@patch("src.webhooks.plane._create_gitea_branch", new_callable=AsyncMock)
|
|
@patch("src.webhooks.plane._create_initial_docs", new_callable=AsyncMock)
|
|
def test_plane_fallback_hash_dedup(mock_docs, mock_branch, mock_enqueue):
|
|
"""Repeated identical Plane body -> first accepted+enqueue, repeat duplicate.
|
|
|
|
Feature 1: the pipeline now starts on a status change to In Progress, not on
|
|
creation, so this drives the dedup test with an 'issue updated' event.
|
|
"""
|
|
IN_PROGRESS = "b873d9eb-993c-48cd-97ac-99a9b1623967"
|
|
body = {
|
|
"event": "issue",
|
|
"action": "updated",
|
|
"data": {
|
|
"id": "pd-001",
|
|
"name": "Dedup plane task",
|
|
"description_stripped": "A sufficiently long description for QG-0 to pass.",
|
|
"project": "proj-1",
|
|
"state": {"id": IN_PROGRESS, "name": "In Progress", "group": "started"},
|
|
},
|
|
}
|
|
r1 = client.post("/webhook/plane", json=body)
|
|
assert r1.status_code == 200
|
|
assert r1.json()["status"] == "accepted"
|
|
assert mock_enqueue.call_count == 1
|
|
assert _events_count() == 1
|
|
|
|
r2 = client.post("/webhook/plane", json=body)
|
|
assert r2.status_code == 200
|
|
assert r2.json()["status"] == "duplicate"
|
|
assert mock_enqueue.call_count == 1 # not re-enqueued
|
|
assert _events_count() == 1
|
|
|
|
|
|
@patch("src.webhooks.plane._create_gitea_branch", new_callable=AsyncMock)
|
|
@patch("src.webhooks.plane._create_initial_docs", new_callable=AsyncMock)
|
|
def test_plane_unknown_project_first_delivery_still_ignored(mock_docs, mock_branch):
|
|
"""ORCH-6 intact: first delivery of an unknown project -> {"status":"ignored"}."""
|
|
body = {
|
|
"event": "work_item.created",
|
|
"data": {"id": "unk-001", "name": "Unknown project task", "project": "proj-UNKNOWN"},
|
|
}
|
|
r1 = client.post("/webhook/plane", json=body)
|
|
assert r1.status_code == 200
|
|
assert r1.json()["status"] == "ignored"
|
|
# Event WAS logged (dedup happens before the project filter), so a retry of the
|
|
# SAME body is a duplicate, not re-evaluated.
|
|
assert _events_count() == 1
|
|
r2 = client.post("/webhook/plane", json=body)
|
|
assert r2.json()["status"] == "duplicate"
|
|
assert _events_count() == 1
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# HMAC still guarded (acceptance #4) — independent of the dedup path
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def test_gitea_invalid_signature_still_401(monkeypatch):
|
|
monkeypatch.setattr(gitea_mod.settings, "gitea_webhook_secret", "s3cr3t", raising=False)
|
|
r = client.post(
|
|
"/webhook/gitea",
|
|
json={"ref": "refs/heads/feature/x", "repository": {"name": "enduro-trails"}, "commits": []},
|
|
headers={"X-Gitea-Event": "push", "X-Gitea-Signature": "deadbeef"},
|
|
)
|
|
assert r.status_code == 401
|
|
|
|
|
|
def test_plane_invalid_signature_still_401(monkeypatch):
|
|
monkeypatch.setattr(plane_mod.settings, "plane_webhook_secret", "s3cr3t", raising=False)
|
|
r = client.post(
|
|
"/webhook/plane",
|
|
json={"event": "work_item.created", "data": {"id": "z", "project": "proj-1"}},
|
|
headers={"X-Plane-Signature": "deadbeef"},
|
|
)
|
|
assert r.status_code == 401
|