Compare commits

...

11 Commits

12 changed files with 813 additions and 40 deletions

View File

@@ -16,6 +16,62 @@ from ..plane_sync import notify_stage_change as plane_notify_stage, add_comment
logger = logging.getLogger("orchestrator.launcher")
def prune_run_logs(runs_dir, keep_days=30, keep_max=500, active_paths=None):
"""L-2: best-effort rotation of per-run logs (<runs_dir>/*.log).
A log file is removed if it is older than keep_days OR it is not within the
keep_max most-recent logs (whichever condition is met first). Only *.log
files directly inside runs_dir are considered; non-.log files and
subdirectories are never touched. Files whose path is in active_paths (the
currently running log) are always kept.
Returns the number of files removed. Never raises: any error is logged and
swallowed so log rotation can never bring the app down.
"""
removed = 0
try:
active = set()
for ap in (active_paths or []):
try:
active.add(os.path.realpath(ap))
except Exception:
active.add(ap)
if not os.path.isdir(runs_dir):
return 0
logs = []
for name in os.listdir(runs_dir):
if not name.endswith(".log"):
continue
path = os.path.join(runs_dir, name)
if not os.path.isfile(path):
continue
if os.path.realpath(path) in active:
continue
try:
mtime = os.path.getmtime(path)
except OSError:
continue
logs.append((path, mtime))
logs.sort(key=lambda t: t[1], reverse=True)
cutoff = time.time() - keep_days * 86400
for idx, (path, mtime) in enumerate(logs):
too_old = mtime < cutoff
over_max = idx >= keep_max
if too_old or over_max:
try:
os.remove(path)
removed += 1
except OSError as e:
logger.warning(f"prune_run_logs: failed to remove {path}: {e}")
except Exception as e:
logger.warning(f"prune_run_logs failed for {runs_dir}: {e}")
return removed
class AgentLauncher:
"""Launch Claude CLI agents directly (binary mounted into container)."""

View File

@@ -66,6 +66,15 @@ class Settings(BaseSettings):
agent_kill_grace_seconds: int = 20
agent_timeout_overrides_json: str = ""
# L-2: run-log rotation. Old per-run logs in <data>/runs/*.log are pruned at
# app startup (best-effort). A *.log is removed if it is older than
# log_keep_days OR not within the log_keep_max most-recent logs (whichever
# hits first). Only *.log files are touched; the active run log is skipped.
# log_keep_days -> max age in days (env ORCH_LOG_KEEP_DAYS).
# log_keep_max -> max number of newest logs to retain (env ORCH_LOG_KEEP_MAX).
log_keep_days: int = 30
log_keep_max: int = 500
# Telegram notifications
telegram_bot_token: str = ""

View File

@@ -67,6 +67,17 @@ def init_db():
# (CREATE TABLE IF NOT EXISTS won't add columns to an already-created table).
_ensure_column(conn, "jobs", "transient_attempts", "INTEGER NOT NULL DEFAULT 0")
_ensure_column(conn, "jobs", "available_at", "TEXT")
# ORCH-5 (M-7): webhook delivery de-dup. Add events.delivery_id and a PARTIAL
# unique index. Partial (WHERE delivery_id IS NOT NULL) so pre-existing rows
# (which have NULL delivery_id) never collide with each other. Restart-safe:
# _ensure_column is a no-op once the column exists, and CREATE INDEX IF NOT
# EXISTS is a no-op once the index exists, so this is safe on the live prod DB.
_ensure_column(conn, "events", "delivery_id", "TEXT")
conn.execute(
"CREATE UNIQUE INDEX IF NOT EXISTS idx_events_delivery "
"ON events(delivery_id) WHERE delivery_id IS NOT NULL"
)
conn.commit()
conn.close()
@@ -141,6 +152,33 @@ def get_next_work_item_id(repo: str, prefix: str = "ET") -> str:
return f"{prefix}-{next_num:03d}"
# ---------------------------------------------------------------------------
# ORCH-5 (M-7): idempotent webhook event logging
# ---------------------------------------------------------------------------
def insert_event_dedup(
source: str, event_type: str, payload: str, delivery_id: str
) -> bool:
"""Idempotently log a webhook event keyed by delivery_id.
Returns True if a NEW row was inserted (caller should dispatch the event) and
False if this delivery_id was already present (a duplicate delivery -> caller
must skip dispatch/enqueue). Uses INSERT OR IGNORE against the partial UNIQUE
index idx_events_delivery; rowcount==1 means the row was actually inserted.
"""
conn = get_db()
try:
cur = conn.execute(
"INSERT OR IGNORE INTO events (source, event_type, payload, delivery_id) "
"VALUES (?, ?, ?, ?)",
(source, event_type, payload, delivery_id),
)
conn.commit()
return cur.rowcount == 1
finally:
conn.close()
# ---------------------------------------------------------------------------
# ORCH-1 (F-2b): job queue helpers
# ---------------------------------------------------------------------------

View File

@@ -60,6 +60,22 @@ async def lifespan(app: FastAPI):
if requeued:
log.warning(f"Queue-recovery: requeued {requeued} running job(s) after restart")
# L-2: rotate old per-run logs at startup (best-effort; never fatal).
try:
import os as _os
from .config import settings as _settings
from .agents.launcher import prune_run_logs
_runs_dir = _os.path.join(_os.path.dirname(_settings.db_path), "runs")
_removed = prune_run_logs(
_runs_dir,
keep_days=_settings.log_keep_days,
keep_max=_settings.log_keep_max,
)
if _removed:
log.info(f"Log rotation: pruned {_removed} old run log(s) from {_runs_dir}")
except Exception as e:
log.warning(f"Log rotation skipped: {e}")
# Start the background job-queue worker (ORCH-1).
from .queue_worker import worker
worker.start()

View File

@@ -6,6 +6,12 @@ from .config import settings
logger = logging.getLogger("orchestrator.plane_sync")
# L-3: emoji literals used in Plane comment bodies, named for readability.
# Message text stays byte-for-byte identical to the previous output.
EMOJI_STAGE = "\U0001F504" # stage transition
EMOJI_QG_FAIL = "\u26A0\uFE0F" # quality-gate failure
EMOJI_DONE = "\u2705" # task completed
PLANE_BASE = f"{settings.plane_api_url}/api/v1"
PLANE_HEADERS = {"X-API-Key": settings.plane_api_token}
WORKSPACE = settings.plane_workspace_slug
@@ -65,6 +71,24 @@ STAGE_TO_STATE = {
}
def fetch_issue_sequence_id(issue_id: str, project_id: str) -> int | None:
"""M-6: GET the Plane issue by UUID and return its sequence_id (the
authoritative per-project number), or None if unavailable.
Returns None on network error, non-2xx, or a missing field - never raises,
so the webhook handler can fall back to DB increment and stay autonomous.
"""
url = f"{PLANE_BASE}/workspaces/{WORKSPACE}/projects/{project_id}/issues/{issue_id}/"
try:
resp = httpx.get(url, headers=PLANE_HEADERS, timeout=10)
resp.raise_for_status()
seq = resp.json().get("sequence_id")
return int(seq) if seq is not None else None
except Exception as e:
logger.warning(f"fetch_issue_sequence_id failed for {issue_id}: {e}")
return None
def find_issue_id(work_item_id: str, project_id: str = None) -> str | None:
"""Find Plane issue UUID by work_item_id (e.g. 'ET-002')."""
project_id = _resolve_project_id(work_item_id, project_id)
@@ -89,25 +113,26 @@ def find_issue_id(work_item_id: str, project_id: str = None) -> str | None:
resp.raise_for_status()
data = resp.json()
results = data.get("results", data if isinstance(data, list) else [])
# M-6: match by sequence_id directly (the authoritative per-project
# number), parsed from the work_item_id suffix - no hardcoded prefix.
try:
target_num = int(work_item_id.rsplit("-", 1)[1])
except (IndexError, ValueError):
target_num = None
for issue in results:
seq = issue.get("sequence_id")
identifier = f"ET-{seq:03d}" if seq else ""
if identifier == work_item_id or work_item_id in issue.get("name", ""):
if target_num is not None and issue.get("sequence_id") == target_num:
return issue["id"]
# Fallback: get all issues and match by sequence_id number
if work_item_id.startswith("ET-"):
try:
target_num = int(work_item_id.split("-")[1])
except (IndexError, ValueError):
target_num = None
if target_num:
resp2 = httpx.get(url, headers=PLANE_HEADERS, timeout=10)
resp2.raise_for_status()
data2 = resp2.json()
results2 = data2.get("results", data2 if isinstance(data2, list) else [])
for issue in results2:
if issue.get("sequence_id") == target_num:
return issue["id"]
if work_item_id in issue.get("name", ""):
return issue["id"]
# Fallback: get all issues and match by sequence_id number (any prefix)
if target_num is not None:
resp2 = httpx.get(url, headers=PLANE_HEADERS, timeout=10)
resp2.raise_for_status()
data2 = resp2.json()
results2 = data2.get("results", data2 if isinstance(data2, list) else [])
for issue in results2:
if issue.get("sequence_id") == target_num:
return issue["id"]
except Exception as e:
logger.error(f"Failed to find issue for {work_item_id}: {e}")
return None
@@ -194,7 +219,7 @@ def notify_stage_change(work_item_id: str, old_stage: str, new_stage: str, agent
project_id = _resolve_project_id(work_item_id, project_id)
update_issue_state(work_item_id, new_stage, project_id)
msg = f"🔄 Stage: {old_stage}{new_stage}"
msg = f"{EMOJI_STAGE} Stage: {old_stage}{new_stage}"
if agent:
msg += f" (launching {agent})"
@@ -232,11 +257,11 @@ def notify_stage_change(work_item_id: str, old_stage: str, new_stage: str, agent
def notify_qg_failure(work_item_id: str, stage: str, check: str, reason: str, project_id: str = None):
"""Notify Plane about QG failure."""
add_comment(work_item_id, f"⚠️ QG failed at {stage}: {check}{reason}", project_id)
add_comment(work_item_id, f"{EMOJI_QG_FAIL} QG failed at {stage}: {check}{reason}", project_id)
def notify_done(work_item_id: str, project_id: str = None):
"""Mark issue as Done in Plane."""
project_id = _resolve_project_id(work_item_id, project_id)
update_issue_state(work_item_id, "done", project_id)
add_comment(work_item_id, " Task completed! PR merged and deployed.", project_id)
add_comment(work_item_id, f"{EMOJI_DONE} Task completed! PR merged and deployed.", project_id)

View File

@@ -5,7 +5,7 @@ Stages:
Each stage defines:
- next: the stage to advance to
- agent: the agent to launch when entering the NEXT stage
- agent: the agent to launch when advancing FROM this stage (NOT the next stage's agent)
- qg: the quality gate check required to leave this stage
"""

52
src/webhooks/_dedup.py Normal file
View File

@@ -0,0 +1,52 @@
"""ORCH-5 (M-7): webhook delivery de-duplication helper.
Webhook providers (Gitea/Plane) retry deliveries on timeout, network reset, or
manual replay. Without idempotency a retried delivery re-enters the pipeline and
spawns a duplicate run (the ET-009 incident class: parallel conveyors on one
repo). This module computes a stable per-delivery id so the webhook handlers can
INSERT-OR-IGNORE into events and skip the dispatch on a repeat.
delivery_id format: ``f"{source}:{raw_or_hash}"`` where source prefixes
gitea/plane so their id-spaces never collide. ``raw`` is the provider's native
delivery header (a GUID) when present; otherwise we fall back to a sha256 of the
body (a retried identical body yields the same hash).
"""
import hashlib
def _sha256_hex(*parts: str) -> str:
h = hashlib.sha256()
for p in parts:
h.update(p.encode("utf-8", "replace"))
return h.hexdigest()
def gitea_delivery_id(headers, event_type: str, body: bytes) -> str:
"""Compute the delivery_id for a Gitea webhook.
Prefers the ``X-Gitea-Delivery`` header (a per-delivery GUID). Falls back to
sha256(source + event_type + body) so a retried identical body still maps to
one id even if Gitea omitted the header.
"""
raw = (headers.get("X-Gitea-Delivery") or "").strip()
if not raw:
raw = _sha256_hex("gitea", event_type or "", body.decode("utf-8", "replace"))
return f"gitea:{raw}"
def plane_delivery_id(headers, body: bytes) -> str:
"""Compute the delivery_id for a Plane webhook.
Plane does not reliably send a delivery header, so we try a couple of common
names and otherwise fall back to sha256("plane" + body): a retried identical
body yields the same id.
"""
raw = (
headers.get("X-Plane-Delivery")
or headers.get("X-Hook-Delivery")
or ""
).strip()
if not raw:
raw = _sha256_hex("plane", body.decode("utf-8", "replace"))
return f"plane:{raw}"

View File

@@ -10,7 +10,14 @@ import httpx
from fastapi import APIRouter, Request, HTTPException
from ..config import settings
from ..db import get_db, get_task_by_repo_branch, update_task_stage, enqueue_job
from ..db import (
get_db,
get_task_by_repo_branch,
update_task_stage,
enqueue_job,
insert_event_dedup,
)
from ._dedup import gitea_delivery_id
from ..stages import get_next_stage, get_agent_for_stage
from ..qg.checks import check_ci_green, check_review_approved
from ..notifications import notify_stage_change, notify_qg_failure, notify_error
@@ -51,15 +58,17 @@ async def gitea_webhook(request: Request):
payload = json.loads(body)
# Log event
conn = get_db()
# ORCH-5 (M-7): idempotent logging. Compute a stable delivery_id (X-Gitea-Delivery
# GUID, or sha256 fallback) and INSERT OR IGNORE. A repeated delivery (Gitea retry
# / manual replay) returns inserted=False -> log + return {"status":"duplicate"}
# WITHOUT re-dispatching, so the pipeline is not re-triggered (ET-009 class).
# Runs AFTER HMAC verification above.
event_type = request.headers.get("X-Gitea-Event", "unknown")
conn.execute(
"INSERT INTO events (source, event_type, payload) VALUES (?, ?, ?)",
("gitea", event_type, body.decode()),
)
conn.commit()
conn.close()
delivery_id = gitea_delivery_id(request.headers, event_type, body)
inserted = insert_event_dedup("gitea", event_type, body.decode(), delivery_id)
if not inserted:
logger.info(f"Gitea webhook duplicate delivery_id={delivery_id}, skipping dispatch")
return {"status": "duplicate"}
if event_type == "push":
await handle_push(payload)

View File

@@ -15,7 +15,9 @@ from ..db import (
get_next_work_item_id,
update_task_stage,
enqueue_job,
insert_event_dedup,
)
from ._dedup import plane_delivery_id
from ..stages import get_next_stage, get_agent_for_stage, get_qg_for_stage, get_previous_stage
from ..qg.checks import QG_CHECKS
from ..notifications import notify_stage_change, notify_qg_failure, notify_error
@@ -61,14 +63,18 @@ async def plane_webhook(request: Request):
payload = json.loads(body)
# Log event
conn = get_db()
conn.execute(
"INSERT INTO events (source, event_type, payload) VALUES (?, ?, ?)",
("plane", payload.get("event", "unknown"), body.decode()),
)
conn.commit()
conn.close()
# ORCH-5 (M-7): idempotent logging. Plane rarely sends a delivery header, so the
# delivery_id falls back to sha256("plane" + body) (a retried identical body maps
# to one id). INSERT OR IGNORE; a duplicate returns inserted=False -> log + return
# {"status":"duplicate"} WITHOUT dispatching. Runs AFTER HMAC and BEFORE the ORCH-6
# project filter, so a repeat does no extra work; the FIRST delivery of an unknown
# project still falls through to the filter below and returns {"status":"ignored"}.
event_type = payload.get("event", "unknown")
delivery_id = plane_delivery_id(request.headers, body)
inserted = insert_event_dedup("plane", event_type, body.decode(), delivery_id)
if not inserted:
logger.info(f"Plane webhook duplicate delivery_id={delivery_id}, skipping dispatch")
return {"status": "duplicate"}
event = payload.get("event")
action = payload.get("action", "")
@@ -148,8 +154,20 @@ async def handle_work_item_created(data: dict, project_id: str = ""):
logger.info(f"QG-0 failed for {plane_id}: {errors}")
return
# Generate work item ID
work_item_id = get_next_work_item_id(repo, proj.work_item_prefix)
# Generate work item ID.
# M-6: source of truth for the number is the Plane sequence_id. Fetch it by
# issue UUID; if Plane is unavailable, fall back to the DB increment so a
# Plane outage never blocks task creation (autonomy > exact numbering).
from ..plane_sync import fetch_issue_sequence_id
seq = fetch_issue_sequence_id(plane_id, plane_project_id)
if seq is not None:
work_item_id = f"{proj.work_item_prefix}-{seq:03d}"
else:
work_item_id = get_next_work_item_id(repo, proj.work_item_prefix)
logger.warning(
f"Plane sequence_id unavailable for {plane_id}, "
f"fell back to DB increment: {work_item_id}"
)
# Create slug from name
slug = re.sub(r"[^a-z0-9]+", "-", name.lower()).strip("-")[:30]

View File

@@ -0,0 +1,92 @@
"""L-2: tests for prune_run_logs (run-log rotation).
Verifies that old / surplus *.log files are removed while fresh logs, non-.log
files, the active log, and subdirectories are left intact. Function is
best-effort and must never raise.
"""
import os
import time
from src.agents.launcher import prune_run_logs
def _touch(path, age_days=0):
with open(path, "w") as f:
f.write("x")
mtime = time.time() - age_days * 86400
os.utime(path, (mtime, mtime))
return path
def test_old_logs_removed_fresh_kept(tmp_path):
runs = tmp_path
fresh = _touch(str(runs / "1.log"), age_days=1)
old = _touch(str(runs / "2.log"), age_days=40)
removed = prune_run_logs(str(runs), keep_days=30, keep_max=500)
assert removed == 1
assert os.path.exists(fresh)
assert not os.path.exists(old)
def test_non_log_files_untouched(tmp_path):
runs = tmp_path
old_log = _touch(str(runs / "stale.log"), age_days=99)
keep_txt = _touch(str(runs / "notes.txt"), age_days=99)
keep_db = _touch(str(runs / "orchestrator.db"), age_days=99)
prune_run_logs(str(runs), keep_days=30, keep_max=500)
assert not os.path.exists(old_log)
assert os.path.exists(keep_txt)
assert os.path.exists(keep_db)
def test_keep_max_retains_newest(tmp_path):
runs = tmp_path
# 5 logs, all recent (within keep_days), increasing age 0..4 days.
paths = []
for i in range(5):
paths.append(_touch(str(runs / f"{i}.log"), age_days=i))
removed = prune_run_logs(str(runs), keep_days=365, keep_max=2)
# Only the 2 newest (age 0, 1) survive.
assert removed == 3
assert os.path.exists(paths[0])
assert os.path.exists(paths[1])
for p in paths[2:]:
assert not os.path.exists(p)
def test_active_log_never_removed(tmp_path):
runs = tmp_path
active = _touch(str(runs / "active.log"), age_days=99)
other = _touch(str(runs / "other.log"), age_days=99)
removed = prune_run_logs(
str(runs), keep_days=30, keep_max=500, active_paths=[active]
)
assert removed == 1
assert os.path.exists(active)
assert not os.path.exists(other)
def test_subdirs_untouched(tmp_path):
runs = tmp_path
sub = runs / "sub.log"
sub.mkdir() # a directory that happens to end in .log
old_log = _touch(str(runs / "old.log"), age_days=99)
prune_run_logs(str(runs), keep_days=30, keep_max=500)
assert sub.is_dir()
assert not os.path.exists(old_log)
def test_missing_dir_is_noop(tmp_path):
missing = tmp_path / "does-not-exist"
# Must not raise.
assert prune_run_logs(str(missing)) == 0

181
tests/test_m6_sequence.py Normal file
View File

@@ -0,0 +1,181 @@
"""M-6: work_item_id derived from Plane sequence_id (source of truth = Plane).
Covers:
* fetch_issue_sequence_id returns int on a valid Plane response (mocked httpx);
* returns None on network error / missing field WITHOUT raising;
* handle_work_item_created uses prefix-NNN when seq is available, and falls
back to get_next_work_item_id when seq is None (Plane down => autonomy);
* find_issue_id no longer hardcodes 'ET-' and matches an arbitrary prefix
(e.g. ORCH-005) by sequence_id.
"""
import os
import tempfile
import pytest
_test_db = os.path.join(tempfile.gettempdir(), "test_orchestrator_m6.db")
os.environ["ORCH_DB_PATH"] = _test_db
os.environ.setdefault("ORCH_PLANE_WEBHOOK_SECRET", "")
os.environ.setdefault("ORCH_GITEA_WEBHOOK_SECRET", "")
os.environ.setdefault("ORCH_GITEA_TOKEN", "test-token")
os.environ.setdefault("ORCH_PLANE_API_TOKEN", "test-token")
from unittest.mock import patch, AsyncMock, MagicMock # noqa: E402
from fastapi.testclient import TestClient # noqa: E402
from src.main import app # noqa: E402
from src.db import init_db, get_db # noqa: E402
from src import projects as P # noqa: E402
from src.projects import reload_projects # noqa: E402
import src.plane_sync as plane_sync # noqa: E402
ORCH_PLANE_ID = "8da6aa25-a60e-44d6-a1e2-d8ae59aa7d6a"
ENDURO_PLANE_ID = "7a79f0a9-5278-49cd-9007-9a338f238f9c"
client = TestClient(app)
@pytest.fixture(autouse=True)
def setup(monkeypatch):
monkeypatch.setattr(P.settings, "db_path", _test_db)
import src.db as _db
monkeypatch.setattr(_db.settings, "db_path", _test_db)
if os.path.exists(_test_db):
os.unlink(_test_db)
init_db()
monkeypatch.setattr("src.webhooks.plane.verify_plane_signature", lambda body, sig: True)
registry_json = (
f'[{{"plane_project_id": "{ENDURO_PLANE_ID}", "repo": "enduro-trails",'
f' "work_item_prefix": "ET", "name": "enduro-trails"}},'
f' {{"plane_project_id": "{ORCH_PLANE_ID}", "repo": "orchestrator",'
f' "work_item_prefix": "ORCH", "name": "orchestrator"}}]'
)
monkeypatch.setattr(P.settings, "projects_json", registry_json)
reload_projects()
yield
reload_projects()
if os.path.exists(_test_db):
os.unlink(_test_db)
def _mock_resp(json_body, status=200):
m = MagicMock()
m.json.return_value = json_body
m.raise_for_status.return_value = None
if status >= 400:
def _raise():
raise RuntimeError(f"HTTP {status}")
m.raise_for_status.side_effect = _raise
return m
# ---------------------------------------------------------------------------
# fetch_issue_sequence_id
# ---------------------------------------------------------------------------
def test_fetch_sequence_id_returns_int():
with patch.object(plane_sync.httpx, "get", return_value=_mock_resp({"sequence_id": 42})):
seq = plane_sync.fetch_issue_sequence_id("issue-uuid", "proj-uuid")
assert seq == 42
assert isinstance(seq, int)
def test_fetch_sequence_id_network_error_returns_none():
with patch.object(plane_sync.httpx, "get", side_effect=RuntimeError("connection refused")):
seq = plane_sync.fetch_issue_sequence_id("issue-uuid", "proj-uuid")
assert seq is None # must not raise
def test_fetch_sequence_id_missing_field_returns_none():
with patch.object(plane_sync.httpx, "get", return_value=_mock_resp({"error": "not found"})):
seq = plane_sync.fetch_issue_sequence_id("missing-uuid", "proj-uuid")
assert seq is None
# ---------------------------------------------------------------------------
# handle_work_item_created: seq available -> prefix-NNN
# ---------------------------------------------------------------------------
def _post(plane_id, plane_project_id=ORCH_PLANE_ID, name="A valid work item title"):
return client.post(
"/webhook/plane",
json={
"event": "work_item.created",
"data": {
"id": plane_id,
"name": name,
"description_stripped": "This is a sufficiently long description.",
"project": plane_project_id,
},
},
)
@patch("src.webhooks.plane.launcher")
@patch("src.webhooks.plane._create_initial_docs", new_callable=AsyncMock)
@patch("src.webhooks.plane._create_gitea_branch", new_callable=AsyncMock)
@patch("src.plane_sync.fetch_issue_sequence_id", return_value=7)
def test_created_uses_plane_sequence_id(mock_fetch, mock_branch, mock_docs, mock_launcher):
mock_launcher.launch.return_value = 1
resp = _post("seq-issue")
assert resp.status_code == 200
conn = get_db()
task = conn.execute("SELECT work_item_id FROM tasks WHERE plane_id='seq-issue'").fetchone()
conn.close()
assert task is not None
assert task["work_item_id"] == "ORCH-007"
mock_fetch.assert_called_once()
@patch("src.webhooks.plane.launcher")
@patch("src.webhooks.plane._create_initial_docs", new_callable=AsyncMock)
@patch("src.webhooks.plane._create_gitea_branch", new_callable=AsyncMock)
@patch("src.plane_sync.fetch_issue_sequence_id", return_value=None)
@patch("src.webhooks.plane.get_next_work_item_id", return_value="ORCH-099")
def test_created_falls_back_to_db_when_plane_down(
mock_next, mock_fetch, mock_branch, mock_docs, mock_launcher
):
"""Plane unavailable (seq=None) => fall back to DB increment; task still created."""
mock_launcher.launch.return_value = 1
resp = _post("fallback-issue")
assert resp.status_code == 200
conn = get_db()
task = conn.execute("SELECT work_item_id FROM tasks WHERE plane_id='fallback-issue'").fetchone()
conn.close()
assert task is not None # autonomy: Plane down does not block creation
assert task["work_item_id"] == "ORCH-099"
mock_next.assert_called_once()
# ---------------------------------------------------------------------------
# find_issue_id: no hardcoded ET- prefix, matches arbitrary prefix by seq
# ---------------------------------------------------------------------------
def test_find_issue_id_matches_arbitrary_prefix_by_sequence():
"""ORCH-005 must resolve via the issue whose sequence_id == 5 (no ET- assumption)."""
issues = {"results": [
{"id": "uuid-a", "sequence_id": 3, "name": "something"},
{"id": "uuid-b", "sequence_id": 5, "name": "ORCH-005: target"},
{"id": "uuid-c", "sequence_id": 9, "name": "other"},
]}
# No DB row for this work_item_id => goes to the Plane API search branch.
with patch.object(plane_sync.httpx, "get", return_value=_mock_resp(issues)):
found = plane_sync.find_issue_id("ORCH-005", project_id="proj-uuid")
assert found == "uuid-b"
def test_find_issue_id_matches_et_prefix_too():
"""Backward compat: ET-002 still resolves by sequence_id == 2."""
issues = {"results": [
{"id": "uuid-x", "sequence_id": 2, "name": "ET item"},
{"id": "uuid-y", "sequence_id": 7, "name": "other"},
]}
with patch.object(plane_sync.httpx, "get", return_value=_mock_resp(issues)):
found = plane_sync.find_issue_id("ET-002", project_id="proj-uuid")
assert found == "uuid-x"

277
tests/test_webhook_dedup.py Normal file
View File

@@ -0,0 +1,277 @@
"""ORCH-5 (M-7): webhook delivery de-duplication tests.
A retried/replayed webhook delivery must be processed exactly once. We mock
enqueue_job (imported into the gitea/plane module namespaces) and assert its
call_count does not grow on a repeat. HMAC is bypassed here by forcing the
webhook secrets empty (the 9 pre-existing 401 webhook tests are a separate
baseline and are NOT touched). A dedicated test keeps the 401-on-bad-signature
guarantee by re-enabling the secret.
"""
import os
import tempfile
from unittest.mock import patch, AsyncMock
import pytest
# Override DB path + project registry BEFORE importing app (same pattern as
# tests/test_webhooks.py).
_test_db = os.path.join(tempfile.gettempdir(), "test_orchestrator_dedup.db")
os.environ["ORCH_DB_PATH"] = _test_db
os.environ["ORCH_REPOS_DIR"] = tempfile.gettempdir()
os.environ["ORCH_GITEA_TOKEN"] = "test-token"
os.environ["ORCH_PLANE_API_TOKEN"] = "test-token"
os.environ["ORCH_GITEA_OWNER"] = "admin"
os.environ["ORCH_DEFAULT_REPO"] = "enduro-trails"
os.environ["ORCH_PROJECTS_JSON"] = (
'[{"plane_project_id": "proj-1", "repo": "enduro-trails", '
'"work_item_prefix": "ET", "name": "enduro-trails"}]'
)
from fastapi.testclient import TestClient # noqa: E402
from src.main import app # noqa: E402
from src.db import init_db, get_db # noqa: E402
from src import db as db_module # noqa: E402
from src.webhooks import gitea as gitea_mod # noqa: E402
from src.webhooks import plane as plane_mod # noqa: E402
from src import projects as projects_mod # noqa: E402
@pytest.fixture(autouse=True)
def setup_db(monkeypatch):
# settings is a process-wide singleton; another test module may have fixed
# settings.db_path to its own file at import time. get_db() reads it live, so
# pin it to OUR db for the duration of each test here.
monkeypatch.setattr(db_module.settings, "db_path", _test_db, raising=False)
if os.path.exists(_test_db):
os.unlink(_test_db)
init_db()
yield
if os.path.exists(_test_db):
os.unlink(_test_db)
@pytest.fixture(autouse=True)
def proj_registry():
"""Pin the shared project registry to proj-1/enduro-trails.
The registry (projects.PROJECTS / _BY_PLANE_ID) is a process-wide singleton
built at import; test_projects.py rebuilds it via reload_projects(), which can
leave it on the built-in default where proj-1 is unknown -> ORCH-6 would
ignore our fixtures. Force ours for each test, then rebuild after.
"""
os.environ["ORCH_PROJECTS_JSON"] = (
'[{"plane_project_id": "proj-1", "repo": "enduro-trails", '
'"work_item_prefix": "ET", "name": "enduro-trails"}]'
)
projects_mod.settings.projects_json = os.environ["ORCH_PROJECTS_JSON"]
projects_mod.reload_projects()
yield
projects_mod.reload_projects()
@pytest.fixture(autouse=True)
def no_hmac(monkeypatch):
"""Bypass HMAC so dedup behavior (not signing) is under test.
settings is shared, so override the secret on the module-level settings that
each verify_* function reads.
"""
monkeypatch.setattr(gitea_mod.settings, "gitea_webhook_secret", "", raising=False)
monkeypatch.setattr(plane_mod.settings, "plane_webhook_secret", "", raising=False)
yield
client = TestClient(app)
def _events_count():
conn = get_db()
n = conn.execute("SELECT COUNT(*) FROM events").fetchone()[0]
conn.close()
return n
# ---------------------------------------------------------------------------
# Migration
# ---------------------------------------------------------------------------
def test_migration_adds_delivery_id_and_index():
"""events has delivery_id + a partial unique index idx_events_delivery."""
conn = get_db()
cols = [r[1] for r in conn.execute("PRAGMA table_info(events)").fetchall()]
idxs = [r[1] for r in conn.execute("PRAGMA index_list(events)").fetchall()]
conn.close()
assert "delivery_id" in cols
assert "idx_events_delivery" in idxs
def test_migration_on_old_db_without_column_does_not_crash():
"""init_db() over a pre-existing events table WITHOUT delivery_id is safe."""
if os.path.exists(_test_db):
os.unlink(_test_db)
import sqlite3
conn = sqlite3.connect(_test_db)
# Old-shape events table (no delivery_id) + a legacy row with NULL delivery_id.
conn.executescript(
"""
CREATE TABLE events (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp TEXT DEFAULT (datetime('now')),
source TEXT NOT NULL,
event_type TEXT NOT NULL,
payload TEXT NOT NULL,
processed INTEGER DEFAULT 0
);
INSERT INTO events (source, event_type, payload) VALUES ('plane','old','{}');
INSERT INTO events (source, event_type, payload) VALUES ('gitea','old2','{}');
"""
)
conn.commit()
conn.close()
# Should add the column + index without raising and keep the legacy rows.
init_db()
conn = get_db()
cols = [r[1] for r in conn.execute("PRAGMA table_info(events)").fetchall()]
n = conn.execute("SELECT COUNT(*) FROM events").fetchone()[0]
conn.close()
assert "delivery_id" in cols
assert n == 2 # legacy NULL-delivery rows preserved, partial index lets them coexist
# ---------------------------------------------------------------------------
# Gitea dedup
# ---------------------------------------------------------------------------
@patch.object(gitea_mod, "enqueue_job")
def test_gitea_duplicate_delivery_id_skips_dispatch(mock_enqueue):
"""Repeated X-Gitea-Delivery -> first processed, second {"status":"duplicate"}."""
# Task at architecture so the ADR push would enqueue.
conn = get_db()
conn.execute(
"INSERT INTO tasks (plane_id, work_item_id, repo, branch, stage) "
"VALUES (?, ?, ?, ?, ?)",
("gd-001", "ET-100", "enduro-trails", "feature/ET-100-x", "architecture"),
)
conn.commit()
conn.close()
body = {
"ref": "refs/heads/feature/ET-100-x",
"repository": {"name": "enduro-trails"},
"commits": [
{"added": ["docs/work-items/ET-100/06-adr/001-d.md"], "modified": []}
],
}
hdrs = {"X-Gitea-Event": "push", "X-Gitea-Delivery": "guid-AAA"}
r1 = client.post("/webhook/gitea", json=body, headers=hdrs)
assert r1.status_code == 200
assert r1.json()["status"] == "accepted"
assert mock_enqueue.call_count == 1
assert _events_count() == 1
# Same delivery id again -> duplicate, no new enqueue, no new event row.
r2 = client.post("/webhook/gitea", json=body, headers=hdrs)
assert r2.status_code == 200
assert r2.json()["status"] == "duplicate"
assert mock_enqueue.call_count == 1
assert _events_count() == 1
@patch.object(gitea_mod, "enqueue_job")
def test_gitea_two_distinct_delivery_ids_both_processed(mock_enqueue):
body = {"ref": "refs/heads/feature/none", "repository": {"name": "enduro-trails"}, "commits": []}
r1 = client.post("/webhook/gitea", json=body,
headers={"X-Gitea-Event": "push", "X-Gitea-Delivery": "guid-1"})
r2 = client.post("/webhook/gitea", json=body,
headers={"X-Gitea-Event": "push", "X-Gitea-Delivery": "guid-2"})
assert r1.json()["status"] == "accepted"
assert r2.json()["status"] == "accepted"
assert _events_count() == 2
def test_gitea_fallback_hash_when_no_delivery_header():
"""No X-Gitea-Delivery -> sha256 fallback; identical body repeat = duplicate."""
body = {"ref": "refs/heads/feature/none", "repository": {"name": "enduro-trails"}, "commits": []}
r1 = client.post("/webhook/gitea", json=body, headers={"X-Gitea-Event": "push"})
r2 = client.post("/webhook/gitea", json=body, headers={"X-Gitea-Event": "push"})
assert r1.json()["status"] == "accepted"
assert r2.json()["status"] == "duplicate"
assert _events_count() == 1
# ---------------------------------------------------------------------------
# Plane dedup
# ---------------------------------------------------------------------------
@patch.object(plane_mod, "enqueue_job")
@patch("src.webhooks.plane._create_gitea_branch", new_callable=AsyncMock)
@patch("src.webhooks.plane._create_initial_docs", new_callable=AsyncMock)
def test_plane_fallback_hash_dedup(mock_docs, mock_branch, mock_enqueue):
"""Repeated identical Plane body -> first accepted+enqueue, repeat duplicate."""
body = {
"event": "work_item.created",
"data": {
"id": "pd-001",
"name": "Dedup plane task",
"description_stripped": "A sufficiently long description for QG-0 to pass.",
"project": "proj-1",
},
}
r1 = client.post("/webhook/plane", json=body)
assert r1.status_code == 200
assert r1.json()["status"] == "accepted"
assert mock_enqueue.call_count == 1
assert _events_count() == 1
r2 = client.post("/webhook/plane", json=body)
assert r2.status_code == 200
assert r2.json()["status"] == "duplicate"
assert mock_enqueue.call_count == 1 # not re-enqueued
assert _events_count() == 1
@patch("src.webhooks.plane._create_gitea_branch", new_callable=AsyncMock)
@patch("src.webhooks.plane._create_initial_docs", new_callable=AsyncMock)
def test_plane_unknown_project_first_delivery_still_ignored(mock_docs, mock_branch):
"""ORCH-6 intact: first delivery of an unknown project -> {"status":"ignored"}."""
body = {
"event": "work_item.created",
"data": {"id": "unk-001", "name": "Unknown project task", "project": "proj-UNKNOWN"},
}
r1 = client.post("/webhook/plane", json=body)
assert r1.status_code == 200
assert r1.json()["status"] == "ignored"
# Event WAS logged (dedup happens before the project filter), so a retry of the
# SAME body is a duplicate, not re-evaluated.
assert _events_count() == 1
r2 = client.post("/webhook/plane", json=body)
assert r2.json()["status"] == "duplicate"
assert _events_count() == 1
# ---------------------------------------------------------------------------
# HMAC still guarded (acceptance #4) — independent of the dedup path
# ---------------------------------------------------------------------------
def test_gitea_invalid_signature_still_401(monkeypatch):
monkeypatch.setattr(gitea_mod.settings, "gitea_webhook_secret", "s3cr3t", raising=False)
r = client.post(
"/webhook/gitea",
json={"ref": "refs/heads/feature/x", "repository": {"name": "enduro-trails"}, "commits": []},
headers={"X-Gitea-Event": "push", "X-Gitea-Signature": "deadbeef"},
)
assert r.status_code == 401
def test_plane_invalid_signature_still_401(monkeypatch):
monkeypatch.setattr(plane_mod.settings, "plane_webhook_secret", "s3cr3t", raising=False)
r = client.post(
"/webhook/plane",
json={"event": "work_item.created", "data": {"id": "z", "project": "proj-1"}},
headers={"X-Plane-Signature": "deadbeef"},
)
assert r.status_code == 401