"""ORCH-053: tests for the Plane-side reconciler (F-2) + sha-resolve helpers. F-2 polls the Plane API per project (``list_issues_by_state``) and REPLAYS a missed In Progress / Approved / Rejected transition through the EXISTING ``webhooks.plane.handle_status_start`` / ``handle_verdict`` handlers — it never duplicates pipeline logic. These tests mock those handlers (AsyncMock) and the Plane API helpers, and verify the dispatch / idempotency / multi-project rules. TC-15 is the AC-4 anti-dup integration test for ``create_task_atomic`` against a real isolated sqlite DB under concurrency. TC-16 exercises ``plane_sync.list_issues_by_state`` directly (pagination + the never-raise contract). """ import os import tempfile import threading from types import SimpleNamespace import pytest _test_db = os.path.join(tempfile.gettempdir(), "test_orchestrator_reconciler_plane.db") os.environ["ORCH_DB_PATH"] = _test_db os.environ["ORCH_REPOS_DIR"] = tempfile.gettempdir() os.environ.setdefault("ORCH_GITEA_TOKEN", "test-token") os.environ.setdefault("ORCH_PLANE_API_TOKEN", "test-token") from unittest.mock import AsyncMock, MagicMock # noqa: E402 import src.db as _db # noqa: E402 from src.db import init_db, get_db, enqueue_job, create_task_atomic # noqa: E402 from src import reconciler as reconciler_mod # noqa: E402 from src import plane_sync # noqa: E402 from src.reconciler import Reconciler # noqa: E402 _IN_PROGRESS = "uuid-in-progress" _APPROVED = "uuid-approved" _REJECTED = "uuid-rejected" _OLD_TS = "2020-01-01T00:00:00Z" # well past any grace @pytest.fixture(autouse=True) def fresh_db(monkeypatch): monkeypatch.setattr(_db.settings, "db_path", _test_db) if os.path.exists(_test_db): os.unlink(_test_db) init_db() yield @pytest.fixture def single_project(monkeypatch): """Restrict F-2 to a single fake project and stub its state resolution.""" proj = SimpleNamespace( plane_project_id="proj-1", repo="enduro-trails", work_item_prefix="ET", ) monkeypatch.setattr(reconciler_mod.projects, "PROJECTS", [proj]) monkeypatch.setattr( reconciler_mod, "get_project_states", lambda pid: { "in_progress": _IN_PROGRESS, "approved": _APPROVED, "rejected": _REJECTED, }, ) return proj def _make_task(plane_id, stage="review", repo="enduro-trails", branch="feature/ET-001-x", wi="ET-001"): conn = get_db() cur = conn.execute( "INSERT INTO tasks (plane_id, work_item_id, repo, branch, stage, plane_issue_id) " "VALUES (?, ?, ?, ?, ?, ?)", (plane_id, wi, repo, branch, stage, plane_id), ) tid = cur.lastrowid conn.commit() conn.close() return tid def _patch_handlers(monkeypatch): start = AsyncMock() verdict = AsyncMock() monkeypatch.setattr(reconciler_mod, "handle_status_start", start) monkeypatch.setattr(reconciler_mod, "handle_verdict", verdict) return start, verdict def _patch_issues(monkeypatch, issues): monkeypatch.setattr( reconciler_mod, "list_issues_by_state", lambda pid, states: list(issues) ) # --------------------------------------------------------------------------- # TC-11: In Progress without a task -> handle_status_start once. # --------------------------------------------------------------------------- def test_tc11_in_progress_without_task_starts_pipeline(monkeypatch, single_project): start, verdict = _patch_handlers(monkeypatch) _patch_issues(monkeypatch, [ {"id": "iss-1", "state": {"id": _IN_PROGRESS}, "updated_at": _OLD_TS, "name": "Some issue"}, ]) Reconciler().reconcile_plane_once() assert start.call_count == 1 issue_data, project_id = start.call_args.args assert issue_data["id"] == "iss-1" assert issue_data["state"]["id"] == _IN_PROGRESS assert project_id == "proj-1" verdict.assert_not_called() # --------------------------------------------------------------------------- # TC-12: Approved with an existing task, no active job -> handle_verdict(True). # --------------------------------------------------------------------------- def test_tc12_approved_replays_verdict(monkeypatch, single_project): start, verdict = _patch_handlers(monkeypatch) _make_task("iss-2", stage="review") _patch_issues(monkeypatch, [ {"id": "iss-2", "state": {"id": _APPROVED}, "updated_at": _OLD_TS}, ]) Reconciler().reconcile_plane_once() assert verdict.call_count == 1 assert verdict.call_args.kwargs.get("approved") is True start.assert_not_called() # --------------------------------------------------------------------------- # TC-13: Rejected with an existing task -> handle_verdict(False). # --------------------------------------------------------------------------- def test_tc13_rejected_replays_verdict(monkeypatch, single_project): start, verdict = _patch_handlers(monkeypatch) _make_task("iss-3", stage="review") _patch_issues(monkeypatch, [ {"id": "iss-3", "state": {"id": _REJECTED}, "updated_at": _OLD_TS}, ]) Reconciler().reconcile_plane_once() assert verdict.call_count == 1 assert verdict.call_args.kwargs.get("approved") is False start.assert_not_called() # --------------------------------------------------------------------------- # TC-14: idempotency — an active job means a live webhook is in flight -> skip. # --------------------------------------------------------------------------- def test_tc14_active_job_skips(monkeypatch, single_project): start, verdict = _patch_handlers(monkeypatch) tid = _make_task("iss-4", stage="review") enqueue_job("reviewer", "enduro-trails", task_id=tid) # active _patch_issues(monkeypatch, [ {"id": "iss-4", "state": {"id": _APPROVED}, "updated_at": _OLD_TS}, ]) Reconciler().reconcile_plane_once() start.assert_not_called() verdict.assert_not_called() # --------------------------------------------------------------------------- # TC-14b: within-grace issue is left alone (lost, not merely delayed). # --------------------------------------------------------------------------- def test_tc14b_within_grace_skipped(monkeypatch, single_project): from datetime import datetime, timezone start, verdict = _patch_handlers(monkeypatch) _make_task("iss-5", stage="review") fresh_ts = datetime.now(timezone.utc).isoformat() _patch_issues(monkeypatch, [ {"id": "iss-5", "state": {"id": _APPROVED}, "updated_at": fresh_ts}, ]) Reconciler().reconcile_plane_once() start.assert_not_called() verdict.assert_not_called() # --------------------------------------------------------------------------- # TC-15 (AC-4): atomic anti-dup — concurrent create_task_atomic for one # plane_id yields exactly ONE row and ONE created=True. # --------------------------------------------------------------------------- def test_tc15_create_task_atomic_no_duplicate(): results = [] barrier = threading.Barrier(8) def worker(): barrier.wait() # maximise the race row, created = create_task_atomic( "plane-dup", "ET-099", "enduro-trails", "feature/ET-099-x", "analysis", "Dup race", ) results.append((row["id"], created)) threads = [threading.Thread(target=worker) for _ in range(8)] for t in threads: t.start() for t in threads: t.join() created_flags = [c for _, c in results] assert created_flags.count(True) == 1 # exactly one winner assert created_flags.count(False) == 7 # the rest see the existing row conn = get_db() n = conn.execute( "SELECT COUNT(*) FROM tasks WHERE plane_id = 'plane-dup'" ).fetchone()[0] conn.close() assert n == 1 # only one task row ever created # All callers see the same row id (the single task). assert len({rid for rid, _ in results}) == 1 # --------------------------------------------------------------------------- # TC-16: list_issues_by_state — never-raise on API error, filter+paginate on OK. # --------------------------------------------------------------------------- def test_tc16_list_issues_never_raises_on_error(monkeypatch): def boom(*a, **k): raise RuntimeError("plane down") monkeypatch.setattr(plane_sync.httpx, "get", boom) out = plane_sync.list_issues_by_state("proj-1", [_APPROVED]) assert out == [] def test_tc16_list_issues_paginates_and_filters(monkeypatch): page1 = { "results": [ {"id": "a", "state": {"id": _APPROVED}}, {"id": "b", "state": {"id": "other"}}, ], "next_page_results": True, "next_cursor": "cur2", } page2 = { "results": [ {"id": "c", "state": _APPROVED}, # bare-uuid state shape {"id": "d", "state": {"id": _REJECTED}}, ], "next_page_results": False, "next_cursor": None, } pages = iter([page1, page2]) def fake_get(url, headers=None, params=None, timeout=None): resp = MagicMock() resp.json.return_value = next(pages) resp.raise_for_status.return_value = None return resp monkeypatch.setattr(plane_sync.httpx, "get", fake_get) out = plane_sync.list_issues_by_state("proj-1", [_APPROVED, _REJECTED]) ids = {i["id"] for i in out} assert ids == {"a", "c", "d"} # 'b' filtered out (state 'other') # --------------------------------------------------------------------------- # TC-17: F-2 polls EVERY registry project and resolves states per-project. # --------------------------------------------------------------------------- def test_tc17_polls_all_projects_resolves_states_per_project(monkeypatch): _patch_handlers(monkeypatch) from src import projects as projects_mod projects_mod.reload_projects() expected_ids = {p.plane_project_id for p in projects_mod.PROJECTS} assert len(expected_ids) >= 2 # enduro + orchestrator in the default registry states_calls = [] issues_calls = [] def fake_states(pid): states_calls.append(pid) return {"in_progress": _IN_PROGRESS, "approved": _APPROVED, "rejected": _REJECTED} def fake_issues(pid, states): issues_calls.append((pid, tuple(states))) return [] monkeypatch.setattr(reconciler_mod, "get_project_states", fake_states) monkeypatch.setattr(reconciler_mod, "list_issues_by_state", fake_issues) Reconciler().reconcile_plane_once() assert set(states_calls) == expected_ids assert {pid for pid, _ in issues_calls} == expected_ids # state uuids are resolved per-project (not hardcoded): each call carries them. for _pid, states in issues_calls: assert set(states) == {_IN_PROGRESS, _APPROVED, _REJECTED} # =========================================================================== # ORCH-068: livelock-fix — terminal exclusion (D1) + confirmed-change unblock # (D2) + dedup (TR-3). The old code spammed `_note_unblock` every ~120s for a # fully synchronized Done task (incident: ET-002, 191+ Telegram messages/night). # =========================================================================== _DONE = "uuid-done" _CANCELLED = "uuid-cancelled" def _patch_states_with_terminals(monkeypatch, *, alias_done_to_approved=False): """Patch F-2 state resolution to include terminals + their groups. ``alias_done_to_approved`` models the regression trigger (ORCH-066): the project "collapses" Done onto the approved UUID, so a genuinely-Done issue would enter the ``approved`` branch by UUID. Only the state GROUP (``completed``) disentangles it — the heart of D1. """ done_uuid = _APPROVED if alias_done_to_approved else _DONE states = { "in_progress": _IN_PROGRESS, "approved": _APPROVED, "rejected": _REJECTED, "done": done_uuid, "cancelled": _CANCELLED, } groups = { _IN_PROGRESS: "started", _APPROVED: "started", _REJECTED: "started", done_uuid: "completed", # genuinely-done issue -> completed group _CANCELLED: "cancelled", } monkeypatch.setattr(reconciler_mod, "get_project_states", lambda pid: states) monkeypatch.setattr( reconciler_mod, "get_project_state_groups", lambda pid: groups ) return states, groups def _spy_telegram(monkeypatch): sent = [] monkeypatch.setattr(reconciler_mod, "send_telegram", lambda msg: sent.append(msg)) return sent def _job_count(): conn = get_db() n = conn.execute("SELECT COUNT(*) FROM jobs").fetchone()[0] conn.close() return n # --------------------------------------------------------------------------- # TC-01 (AC-1, AC-7): synchronized Done task -> total silence, 0 jobs. # --------------------------------------------------------------------------- def test_tc01_synced_done_is_silent(monkeypatch, single_project): start, verdict = _patch_handlers(monkeypatch) _patch_states_with_terminals(monkeypatch) sent = _spy_telegram(monkeypatch) _make_task("iss-done", stage="done", wi="ET-002") _patch_issues(monkeypatch, [ {"id": "iss-done", "state": {"id": _DONE}, "updated_at": _OLD_TS}, ]) recon = Reconciler() recon.reconcile_plane_once() start.assert_not_called() verdict.assert_not_called() assert sent == [] assert recon.unblocked_total == 0 assert recon.skipped_terminal_total == 1 assert _job_count() == 0 # --------------------------------------------------------------------------- # TC-02 (AC-2): Done UUID aliased onto approved -> still excluded by GROUP. # --------------------------------------------------------------------------- def test_tc02_terminal_aliased_to_approved_excluded(monkeypatch, single_project): start, verdict = _patch_handlers(monkeypatch) _patch_states_with_terminals(monkeypatch, alias_done_to_approved=True) sent = _spy_telegram(monkeypatch) # Task is Done; its Plane state UUID equals the approved UUID (aliasing). _make_task("iss-alias", stage="done", wi="ET-002") _patch_issues(monkeypatch, [ {"id": "iss-alias", "state": {"id": _APPROVED}, "updated_at": _OLD_TS}, ]) recon = Reconciler() recon.reconcile_plane_once() # Without the group check this would enter the approved branch and notify. start.assert_not_called() verdict.assert_not_called() assert sent == [] assert recon.unblocked_total == 0 assert recon.skipped_terminal_total == 1 # --------------------------------------------------------------------------- # TC-03 (AC-2): Cancelled terminal is also excluded. # --------------------------------------------------------------------------- def test_tc03_cancelled_excluded(monkeypatch, single_project): start, verdict = _patch_handlers(monkeypatch) _patch_states_with_terminals(monkeypatch) sent = _spy_telegram(monkeypatch) _make_task("iss-cancel", stage="done", wi="ET-003") _patch_issues(monkeypatch, [ {"id": "iss-cancel", "state": {"id": _CANCELLED}, "updated_at": _OLD_TS}, ]) recon = Reconciler() recon.reconcile_plane_once() start.assert_not_called() verdict.assert_not_called() assert sent == [] assert recon.unblocked_total == 0 assert recon.skipped_terminal_total == 1 # --------------------------------------------------------------------------- # TC-04 (AC-3): no-op dispatch (stage unchanged) -> no notification. # --------------------------------------------------------------------------- def test_tc04_noop_dispatch_no_unblock(monkeypatch, single_project): # handle_verdict is a no-op AsyncMock -> the task stage never moves. start, verdict = _patch_handlers(monkeypatch) sent = _spy_telegram(monkeypatch) _make_task("iss-noop", stage="review") _patch_issues(monkeypatch, [ {"id": "iss-noop", "state": {"id": _APPROVED}, "updated_at": _OLD_TS}, ]) recon = Reconciler() recon.reconcile_plane_once() # The handler was replayed (idempotent), but nothing changed -> silence. assert verdict.call_count == 1 assert sent == [] assert recon.unblocked_total == 0 # --------------------------------------------------------------------------- # TC-05 (AC-4): two consecutive ticks on a synced task -> 0 repeat unblocks; # plus a direct check of the in-memory dedup guard. # --------------------------------------------------------------------------- def test_tc05_dedup_no_repeat_notification(monkeypatch, single_project): start, verdict = _patch_handlers(monkeypatch) _patch_states_with_terminals(monkeypatch) sent = _spy_telegram(monkeypatch) _make_task("iss-dedup", stage="done", wi="ET-004") _patch_issues(monkeypatch, [ {"id": "iss-dedup", "state": {"id": _DONE}, "updated_at": _OLD_TS}, ]) recon = Reconciler() recon.reconcile_plane_once() recon.reconcile_plane_once() assert sent == [] assert recon.unblocked_total == 0 # Direct dedup-guard exercise: the same issue+state notifies at most once. recon._note_unblock("ET-004", "review", "state-x") recon._note_unblock("ET-004", "review", "state-x") assert recon.unblocked_total == 1 assert recon.deduped_total == 1 # --------------------------------------------------------------------------- # TC-06 (AC-5): legit lost Approved webhook -> replayed, advanced, ONE unblock. # --------------------------------------------------------------------------- def test_tc06_legit_approved_unblock_once(monkeypatch, single_project): _patch_states_with_terminals(monkeypatch) # non-terminal approved -> actionable sent = _spy_telegram(monkeypatch) _make_task("iss-appr", stage="review", wi="ET-005") async def fake_verdict(issue_data, project_id, approved=True): # Simulate the real handler advancing the stage (review -> testing). conn = get_db() conn.execute( "UPDATE tasks SET stage='testing' WHERE plane_id=?", (issue_data["id"],), ) conn.commit() conn.close() monkeypatch.setattr(reconciler_mod, "handle_verdict", fake_verdict) monkeypatch.setattr(reconciler_mod, "handle_status_start", AsyncMock()) _patch_issues(monkeypatch, [ {"id": "iss-appr", "state": {"id": _APPROVED}, "updated_at": _OLD_TS}, ]) recon = Reconciler() recon.reconcile_plane_once() assert recon.unblocked_total == 1 assert len(sent) == 1 assert "ET-005" in sent[0] # --------------------------------------------------------------------------- # TC-07 (AC-6): lost In Progress start (task appears) and lost Rejected # rollback (stage moves) each fire exactly one unblock. # --------------------------------------------------------------------------- def test_tc07_in_progress_start_and_rejected_each_one_unblock( monkeypatch, single_project ): _patch_states_with_terminals(monkeypatch) sent = _spy_telegram(monkeypatch) async def fake_start(issue_data, project_id): # Simulate the real start handler creating the task. _make_task(issue_data["id"], stage="analysis", wi="ET-006") async def fake_verdict(issue_data, project_id, approved=True): conn = get_db() conn.execute( "UPDATE tasks SET stage='development' WHERE plane_id=?", (issue_data["id"],), ) conn.commit() conn.close() monkeypatch.setattr(reconciler_mod, "handle_status_start", fake_start) monkeypatch.setattr(reconciler_mod, "handle_verdict", fake_verdict) # Rejected task already exists at review; In Progress one has no task yet. _make_task("iss-rej", stage="review", wi="ET-007") _patch_issues(monkeypatch, [ {"id": "iss-start", "state": {"id": _IN_PROGRESS}, "updated_at": _OLD_TS}, {"id": "iss-rej", "state": {"id": _REJECTED}, "updated_at": _OLD_TS}, ]) recon = Reconciler() recon.reconcile_plane_once() assert recon.unblocked_total == 2 assert len(sent) == 2 # --------------------------------------------------------------------------- # TC-08 (AC-8): never-raise — a failing dependency isolates to its unit of work. # --------------------------------------------------------------------------- def test_tc08_never_raise_isolation(monkeypatch, single_project): _patch_states_with_terminals(monkeypatch) monkeypatch.setattr(reconciler_mod, "send_telegram", lambda msg: None) # _dispatch blows up for one issue -> isolated; the tick must not crash. def boom_dispatch(*a, **k): raise RuntimeError("handler exploded") monkeypatch.setattr(Reconciler, "_dispatch", staticmethod(boom_dispatch)) _make_task("iss-boom", stage="review", wi="ET-008") _patch_issues(monkeypatch, [ {"id": "iss-boom", "state": {"id": _APPROVED}, "updated_at": _OLD_TS}, ]) recon = Reconciler() recon.reconcile_plane_once() # must NOT raise assert recon.unblocked_total == 0 # list_issues_by_state raising -> per-project isolation, still no crash. def boom_list(pid, states): raise RuntimeError("plane down") monkeypatch.setattr(reconciler_mod, "list_issues_by_state", boom_list) recon.reconcile_plane_once() # must NOT raise # --------------------------------------------------------------------------- # TC-09 (AC-9): kill-switches mute F-2. # --------------------------------------------------------------------------- def test_tc09_kill_switches(monkeypatch, single_project): start, verdict = _patch_handlers(monkeypatch) _patch_states_with_terminals(monkeypatch) called = {"list": 0} def counting_list(pid, states): called["list"] += 1 return [{"id": "iss-x", "state": {"id": _APPROVED}, "updated_at": _OLD_TS}] monkeypatch.setattr(reconciler_mod, "list_issues_by_state", counting_list) monkeypatch.setattr(reconciler_mod.settings, "reconcile_enabled", False) Reconciler().reconcile_plane_once() assert called["list"] == 0 # global switch off -> F-2 never runs monkeypatch.setattr(reconciler_mod.settings, "reconcile_enabled", True) monkeypatch.setattr(reconciler_mod.settings, "reconcile_plane_enabled", False) Reconciler().reconcile_plane_once() assert called["list"] == 0 # F-2 switch off -> still no poll # --------------------------------------------------------------------------- # TC-10 (AC-1, AC-2): end-to-end on BOTH registry projects (enduro AND # orchestrator): a Done task on each -> 0 notifications / 0 jobs, regardless # of per-project status aliasing. The headline regression test. # --------------------------------------------------------------------------- def test_tc10_done_silent_on_all_projects(monkeypatch): from src import projects as projects_mod projects_mod.reload_projects() assert len({p.plane_project_id for p in projects_mod.PROJECTS}) >= 2 start, verdict = _patch_handlers(monkeypatch) sent = _spy_telegram(monkeypatch) states = { "in_progress": _IN_PROGRESS, "approved": _APPROVED, "rejected": _REJECTED, "done": _DONE, "cancelled": _CANCELLED, } groups = {_DONE: "completed", _CANCELLED: "cancelled"} monkeypatch.setattr(reconciler_mod, "get_project_states", lambda pid: states) monkeypatch.setattr( reconciler_mod, "get_project_state_groups", lambda pid: groups ) # Each project returns a Done issue (unique id per project). monkeypatch.setattr( reconciler_mod, "list_issues_by_state", lambda pid, st: [ {"id": f"done-{pid}", "state": {"id": _DONE}, "updated_at": _OLD_TS} ], ) recon = Reconciler() recon.reconcile_plane_once() start.assert_not_called() verdict.assert_not_called() assert sent == [] assert recon.unblocked_total == 0 assert recon.skipped_terminal_total >= 2 # one per project assert _job_count() == 0