"""ORCH-088 — per-repo serial gate, unit tests (real tmp SQLite). Covers (04-test-plan.yaml): TC-01 claim_next_job does NOT claim an analyst-job of a NEW task B while the repo has an unfinished task A (gate closed). TC-02 serial_gate_applies: enabled + empty CSV -> True for any repo; CSV membership -> True; repo outside CSV -> False; disabled -> False. TC-03 jobs of an ALREADY-active task (architect/developer/.../deployer) are never gated — the single active task advances freely. TC-08 per-repo: an active orchestrator task does NOT gate an enduro analyst-job. TC-15 kill-switch off -> claim is 1:1 as before ORCH-088. TC-16 repo outside a non-empty CSV -> gate inert for that repo. TC-17 DB/build error in the gate -> fail-OPEN: claim does not crash, still claims. TC-19 snapshot() shape + never-raise. TC-21 STAGE_TRANSITIONS / QG_CHECKS registries unchanged (no new QG check). """ import os import tempfile import pytest os.environ["ORCH_DB_PATH"] = os.path.join(tempfile.gettempdir(), "test_serial_gate.db") os.environ.setdefault("ORCH_GITEA_TOKEN", "test-token") os.environ.setdefault("ORCH_PLANE_API_TOKEN", "test-token") import src.db as db # noqa: E402 from src.db import init_db, get_db, enqueue_job, claim_next_job # noqa: E402 from src import serial_gate # noqa: E402 from src import config as cfg # noqa: E402 @pytest.fixture(autouse=True) def fresh_db(tmp_path, monkeypatch): dbfile = tmp_path / "sg.db" monkeypatch.setattr(db.settings, "db_path", str(dbfile)) # Feature ON by default; freeze layer ON; empty CSV (all repos). monkeypatch.setattr(cfg.settings, "serial_gate_enabled", True, raising=False) monkeypatch.setattr(cfg.settings, "serial_gate_repos", "", raising=False) monkeypatch.setattr(cfg.settings, "serial_gate_freeze_enabled", True, raising=False) # Keep the unrelated dep-gate inert so claim semantics isolate the serial gate. monkeypatch.setattr(cfg.settings, "task_deps_enabled", False, raising=False) init_db() yield def _make_task(work_item_id, stage="analysis", repo="orchestrator"): conn = get_db() cur = conn.execute( "INSERT INTO tasks (plane_id, work_item_id, repo, branch, stage, title) " "VALUES (?, ?, ?, ?, ?, ?)", (work_item_id, work_item_id, repo, f"feature/{work_item_id}", stage, work_item_id), ) tid = cur.lastrowid conn.commit() conn.close() return tid def _set_stage(task_id, stage): conn = get_db() conn.execute("UPDATE tasks SET stage=? WHERE id=?", (stage, task_id)) conn.commit() conn.close() # --------------------------------------------------------------- TC-01 def test_gate_closed_when_repo_has_active_task(): _make_task("ORCH-201", stage="development") # active predecessor b = _make_task("ORCH-202", stage="analysis") # new task job_b = enqueue_job("analyst", "orchestrator", "B", task_id=b) # A is unfinished -> the analyst-job of B is NOT claimable. assert claim_next_job() is None, "analyst-job must be gated by active task A" # /queue shows B waiting + an active task for the repo. snap = serial_gate.snapshot() per = snap["per_repo"]["orchestrator"] assert per["active_task"]["work_item_id"] == "ORCH-201" assert any(w["job_id"] == job_b for w in per["waiting"]) # --------------------------------------------------------------- TC-02 def test_serial_gate_applies_scopes(monkeypatch): # enabled + empty CSV -> all repos. assert serial_gate.serial_gate_applies("orchestrator") is True assert serial_gate.serial_gate_applies("enduro-trails") is True # CSV membership. monkeypatch.setattr(cfg.settings, "serial_gate_repos", "orchestrator", raising=False) assert serial_gate.serial_gate_applies("orchestrator") is True assert serial_gate.serial_gate_applies("enduro-trails") is False # kill-switch off -> never applies. monkeypatch.setattr(cfg.settings, "serial_gate_enabled", False, raising=False) assert serial_gate.serial_gate_applies("orchestrator") is False # --------------------------------------------------------------- TC-03 def test_non_analyst_job_of_active_task_passes(): a = _make_task("ORCH-210", stage="development") # an unrelated unfinished task in the same repo (would close the gate for analyst) _make_task("ORCH-211", stage="analysis") for role in ("architect", "developer", "reviewer", "tester", "deployer"): jid = enqueue_job(role, "orchestrator", role, task_id=a) claimed = claim_next_job() assert claimed is not None and claimed["id"] == jid, ( f"{role}-job of an active task must never be gated" ) # finish it so the next role's job is the only queued one. db.mark_job(jid, "done") # --------------------------------------------------------------- TC-08 def test_per_repo_isolation(): # orchestrator busy; enduro gets a brand-new analyst-job. _make_task("ORCH-220", stage="development", repo="orchestrator") b = _make_task("ET-220", stage="analysis", repo="enduro-trails") job_b = enqueue_job("analyst", "enduro-trails", "B", task_id=b) claimed = claim_next_job() assert claimed is not None and claimed["id"] == job_b, ( "orchestrator's active task must not gate enduro's analyst-job" ) # --------------------------------------------------------------- TC-15 def test_kill_switch_off_is_inert(monkeypatch): monkeypatch.setattr(cfg.settings, "serial_gate_enabled", False, raising=False) _make_task("ORCH-230", stage="development") # active task b = _make_task("ORCH-231", stage="analysis") job_b = enqueue_job("analyst", "orchestrator", "B", task_id=b) claimed = claim_next_job() assert claimed is not None and claimed["id"] == job_b, ( "with the kill-switch off the gate must be inert (claims as before)" ) # --------------------------------------------------------------- TC-16 def test_repo_outside_csv_not_gated(monkeypatch): monkeypatch.setattr(cfg.settings, "serial_gate_repos", "enduro-trails", raising=False) _make_task("ORCH-240", stage="development") # active orchestrator task b = _make_task("ORCH-241", stage="analysis") job_b = enqueue_job("analyst", "orchestrator", "B", task_id=b) claimed = claim_next_job() assert claimed is not None and claimed["id"] == job_b, ( "orchestrator is outside the CSV scope -> gate must not apply" ) # --------------------------------------------------------------- TC-17 def test_build_clause_error_fails_open(monkeypatch): """A build error in the gate clause must fail-OPEN (claim still proceeds).""" _make_task("ORCH-250", stage="development") # would close the gate b = _make_task("ORCH-251", stage="analysis") job_b = enqueue_job("analyst", "orchestrator", "B", task_id=b) def _boom(): raise RuntimeError("clause build down") monkeypatch.setattr(serial_gate, "build_claim_clause", _boom, raising=True) claimed = claim_next_job() assert claimed is not None and claimed["id"] == job_b, ( "a gate build error must fail-OPEN, not wedge the queue (AC-8)" ) # --------------------------------------------------------------- TC-19 def test_snapshot_shape_and_never_raises(monkeypatch): snap = serial_gate.snapshot() assert snap["enabled"] is True assert "repos" in snap and "freeze_enabled" in snap assert isinstance(snap["per_repo"], dict) # never-raise: a DB failure -> minimal dict with flags, empty per_repo. monkeypatch.setattr( serial_gate, "_known_repos", lambda: (_ for _ in ()).throw(RuntimeError("db down")), raising=True, ) snap2 = serial_gate.snapshot() assert snap2["per_repo"] == {} assert snap2["enabled"] is True # --------------------------------------------------------------- TC-21 def test_registries_unchanged(): from src.stages import STAGE_TRANSITIONS from src.qg.checks import QG_CHECKS # ORCH-090 (adr-0026): `cancelled` is added as a terminal SINK (parallel to # `done`), NOT a new pipeline edge — serial-gate FIFO semantics are unchanged. assert set(STAGE_TRANSITIONS) == { "created", "analysis", "architecture", "development", "review", "testing", "deploy-staging", "deploy", "done", "cancelled", } # No serial-gate QG check was introduced (the gate is a scheduler condition). assert not any("serial" in k for k in QG_CHECKS), "no new QG check expected"