"""ORCH-026 — /queue task_deps observability (TC-G02, G-2). task_deps.snapshot() is a read-only summary (NOT a source of truth) exposing the declared edges, blocked tasks and any detected cycle. It must never raise. """ import os import tempfile import pytest os.environ["ORCH_DB_PATH"] = os.path.join(tempfile.gettempdir(), "test_orch026_queue_obs.db") os.environ.setdefault("ORCH_GITEA_TOKEN", "test-token") os.environ.setdefault("ORCH_PLANE_API_TOKEN", "test-token") import src.db as db # noqa: E402 from src.db import init_db, get_db # noqa: E402 from src import task_deps # noqa: E402 @pytest.fixture(autouse=True) def fresh_db(tmp_path, monkeypatch): dbfile = tmp_path / "obs.db" monkeypatch.setattr(db.settings, "db_path", str(dbfile)) monkeypatch.setattr(db.settings, "task_deps_enabled", True, raising=False) monkeypatch.setattr(db.settings, "task_deps_source", "db", raising=False) init_db() yield def _make_task(work_item_id, stage="development"): conn = get_db() cur = conn.execute( "INSERT INTO tasks (plane_id, work_item_id, repo, branch, stage) " "VALUES (?, ?, ?, ?, ?)", (work_item_id, work_item_id, "orchestrator", f"feature/{work_item_id}", stage), ) tid = cur.lastrowid conn.commit() conn.close() return tid def test_snapshot_shape_empty(): snap = task_deps.snapshot() assert snap["enabled"] is True assert snap["source"] == "db" assert snap["edges"] == 0 assert snap["blocked_tasks"] == [] assert snap["cycle"] is None def test_snapshot_reports_blocked_task(): a = _make_task("ORCH-100", stage="development") b = _make_task("ORCH-101", stage="development") db.add_dependency(b, a) snap = task_deps.snapshot() assert snap["edges"] == 1 assert len(snap["blocked_tasks"]) == 1 bt = snap["blocked_tasks"][0] assert bt["work_item_id"] == "ORCH-101" assert "ORCH-100" in bt["waiting_on"] assert snap["cycle"] is None def test_snapshot_reports_cycle(): a = _make_task("ORCH-102") b = _make_task("ORCH-103") db.add_dependency(a, b) db.add_dependency(b, a) snap = task_deps.snapshot() assert snap["cycle"] is not None assert "ORCH-102" in snap["cycle"] or "ORCH-103" in snap["cycle"] def test_snapshot_never_raises(monkeypatch): monkeypatch.setattr(db, "get_dependency_edges", lambda: (_ for _ in ()).throw(RuntimeError("db down")), raising=False) snap = task_deps.snapshot() assert snap["edges"] == 0 assert snap["blocked_tasks"] == [] def test_queue_endpoint_includes_task_deps(monkeypatch): """GET /queue payload carries the task_deps block (read-only).""" import asyncio from src import main payload = asyncio.run(main.queue()) assert "task_deps" in payload assert "enabled" in payload["task_deps"]