"""ORCH-088 — serial gate end-to-end queue behaviour (real tmp SQLite). Covers (04-test-plan.yaml): TC-04 after A.stage='done' the waiting analyst-job of B is claimed (gate opens automatically — no manual action). TC-05 a queue of 3 tasks of one repo is processed strictly one-at-a-time, FIFO by jobs.id: while A is unfinished neither B nor C starts. TC-06 restart-safe: the active task is derived from the DB (tasks.repo + stage!='done'), not in-memory — re-reading state keeps the gate closed. """ import os import tempfile import pytest os.environ["ORCH_DB_PATH"] = os.path.join(tempfile.gettempdir(), "test_serial_gate_e2e.db") os.environ.setdefault("ORCH_GITEA_TOKEN", "test-token") os.environ.setdefault("ORCH_PLANE_API_TOKEN", "test-token") import src.db as db # noqa: E402 from src.db import init_db, get_db, enqueue_job, claim_next_job # noqa: E402 from src import config as cfg # noqa: E402 @pytest.fixture(autouse=True) def fresh_db(tmp_path, monkeypatch): dbfile = tmp_path / "e2e.db" monkeypatch.setattr(db.settings, "db_path", str(dbfile)) monkeypatch.setattr(cfg.settings, "serial_gate_enabled", True, raising=False) monkeypatch.setattr(cfg.settings, "serial_gate_repos", "", raising=False) monkeypatch.setattr(cfg.settings, "serial_gate_freeze_enabled", True, raising=False) monkeypatch.setattr(cfg.settings, "task_deps_enabled", False, raising=False) init_db() yield def _make_task(work_item_id, stage="analysis", repo="orchestrator"): conn = get_db() cur = conn.execute( "INSERT INTO tasks (plane_id, work_item_id, repo, branch, stage) " "VALUES (?, ?, ?, ?, ?)", (work_item_id, work_item_id, repo, f"feature/{work_item_id}", stage), ) tid = cur.lastrowid conn.commit() conn.close() return tid def _set_stage(task_id, stage): conn = get_db() conn.execute("UPDATE tasks SET stage=? WHERE id=?", (stage, task_id)) conn.commit() conn.close() # --------------------------------------------------------------- TC-04 def test_next_starts_automatically_when_predecessor_done(): a = _make_task("ORCH-301", stage="development") b = _make_task("ORCH-302", stage="analysis") job_b = enqueue_job("analyst", "orchestrator", "B", task_id=b) assert claim_next_job() is None, "B gated while A unfinished" # A reaches done -> the gate opens on the NEXT claim tick, no manual action. _set_stage(a, "done") claimed = claim_next_job() assert claimed is not None and claimed["id"] == job_b # --------------------------------------------------------------- TC-05 def test_three_tasks_processed_one_at_a_time_fifo(): a = _make_task("ORCH-310", stage="analysis") b = _make_task("ORCH-311", stage="analysis") c = _make_task("ORCH-312", stage="analysis") job_a = enqueue_job("analyst", "orchestrator", "A", task_id=a) job_b = enqueue_job("analyst", "orchestrator", "B", task_id=b) job_c = enqueue_job("analyst", "orchestrator", "C", task_id=c) # Only the FIFO-first task (A, lowest id) is claimable. claimed = claim_next_job() assert claimed is not None and claimed["id"] == job_a assert claim_next_job() is None, "B and C must wait while A is unfinished" # A runs through to done; now B (next) is claimable, C still waits. db.mark_job(job_a, "done") _set_stage(a, "done") claimed_b = claim_next_job() assert claimed_b is not None and claimed_b["id"] == job_b assert claim_next_job() is None, "C must wait while B is unfinished" # B done -> C claimable last (strict FIFO order preserved). db.mark_job(job_b, "done") _set_stage(b, "done") claimed_c = claim_next_job() assert claimed_c is not None and claimed_c["id"] == job_c # --------------------------------------------------------------- TC-06 def test_restart_safe_active_task_from_db(): a = _make_task("ORCH-320", stage="development") b = _make_task("ORCH-321", stage="analysis") job_b = enqueue_job("analyst", "orchestrator", "B", task_id=b) assert claim_next_job() is None # Simulate a restart: there is NO in-memory state — the gate recomputes purely # from the DB. Re-running init_db (idempotent) + a fresh claim must still gate B. init_db() assert claim_next_job() is None, "after restart the gate is still closed (DB-derived)" _set_stage(a, "done") claimed = claim_next_job() assert claimed is not None and claimed["id"] == job_b