"""ORCH-088 — GET /queue additive serial_gate block (AC-10 / TC-20). The /queue payload must gain an additive ``serial_gate`` block WITHOUT changing any pre-existing key (counts/max_concurrency/reconcile/reaper/post_deploy/ task_deps/recent ...). """ import os import tempfile import pytest os.environ["ORCH_DB_PATH"] = os.path.join(tempfile.gettempdir(), "test_queue_endpoint.db") os.environ.setdefault("ORCH_GITEA_TOKEN", "test-token") os.environ.setdefault("ORCH_PLANE_API_TOKEN", "test-token") import src.db as db # noqa: E402 from src.db import init_db # noqa: E402 from src import config as cfg # noqa: E402 @pytest.fixture(autouse=True) def fresh_db(tmp_path, monkeypatch): dbfile = tmp_path / "q.db" monkeypatch.setattr(db.settings, "db_path", str(dbfile)) monkeypatch.setattr(cfg.settings, "serial_gate_enabled", True, raising=False) init_db() yield def test_queue_has_serial_gate_block_and_keeps_existing_keys(): import asyncio from src import main payload = asyncio.run(main.queue()) # Pre-existing keys are all still present (no contract break). for key in ( "counts", "max_concurrency", "poll_interval", "resilience", "reconcile", "reaper", "post_deploy", "merge_verify", "task_deps", "recent", ): assert key in payload, f"existing /queue key '{key}' must be preserved" # New additive block. assert "serial_gate" in payload sg = payload["serial_gate"] assert sg["enabled"] is True assert "repos" in sg and "freeze_enabled" in sg assert isinstance(sg["per_repo"], dict) def test_queue_serial_gate_reflects_freeze(): import asyncio from src import main from src import serial_gate serial_gate.set_repo_freeze("orchestrator", "DEGRADED", "ORCH-900") payload = asyncio.run(main.queue()) per = payload["serial_gate"]["per_repo"] assert "orchestrator" in per assert per["orchestrator"]["frozen"] is True assert per["orchestrator"]["frozen_reason"] == "DEGRADED" # --- ORCH-019 (TC-13): additive bug_fast_track block ----------------------- def test_queue_has_bug_fast_track_block_and_keeps_existing_keys(monkeypatch): import asyncio from src import main monkeypatch.setattr(cfg.settings, "bug_fast_track_enabled", True, raising=False) payload = asyncio.run(main.queue()) # Pre-existing keys are all still present (no contract break). for key in ("counts", "serial_gate", "coverage", "auto_labels", "stop", "recent"): assert key in payload, f"existing /queue key '{key}' must be preserved" assert "bug_fast_track" in payload bft = payload["bug_fast_track"] assert bft["enabled"] is True assert set(bft) >= { "enabled", "label", "repos", "active_bug_tasks", "total_bug_tasks", "est_saved_architecture_runs", } def test_queue_bug_fast_track_counts_bug_tasks(): import asyncio from src import main conn = db.get_db() conn.execute( "INSERT INTO tasks (plane_id, work_item_id, repo, branch, stage, title, track) " "VALUES ('p1','ORCH-401','orchestrator','feature/x','development','t','bug')" ) conn.execute( "INSERT INTO tasks (plane_id, work_item_id, repo, branch, stage, title, track) " "VALUES ('p2','ORCH-402','orchestrator','feature/y','done','t','bug')" ) conn.execute( "INSERT INTO tasks (plane_id, work_item_id, repo, branch, stage, title, track) " "VALUES ('p3','ORCH-403','orchestrator','feature/z','development','t','full')" ) conn.commit() conn.close() bft = asyncio.run(main.queue())["bug_fast_track"] assert bft["total_bug_tasks"] == 2 # two bug tasks total assert bft["active_bug_tasks"] == 1 # one non-terminal bug task assert bft["est_saved_architecture_runs"] == 2