"""ORCH-026 (Level B): declarative task-dependency logic. Leaf module — pure, unit-testable functions over the additive ``job_deps`` table (see src/db.py / 08-data-requirements.md). It answers two questions the rest of the pipeline asks: * "is task B ready to run?" — every declared predecessor A reached ``tasks.stage = 'done'`` (``is_task_ready``). The scheduler gate in ``db.claim_next_job`` enforces the same predicate in SQL; this Python copy is for the reconciler skip and for naming WHAT a task waits on (visibility). * "is there a dependency deadlock?" — a directed cycle A->B->A (or longer) can never be satisfied, so the tasks in it would wait forever. ``detect_cycle`` / ``find_any_cycle`` find one deterministically; ``handle_cycle`` escalates it to Blocked + alert so the deadlock is visible instead of silent. never-raise contract (AC-G1, self-hosting safety): EVERY public function degrades conservatively on any error (DB/import) and NEVER propagates an exception into the worker / reconciler / webhook. Readiness fails OPEN (``True``) so a transient DB error cannot wedge the whole queue; cycle detection fails CLOSED-safe (``None`` = "no cycle proven", do not block). """ from __future__ import annotations import logging from . import db from .config import settings logger = logging.getLogger(__name__) # --------------------------------------------------------------------------- # Readiness gate (B-2) # --------------------------------------------------------------------------- def is_task_ready(task_id: int) -> tuple[bool, list[str]]: """Return ``(ready, waiting_on)`` for a task. ``ready`` is True when the task has no declared dependency whose predecessor is still un-done. ORCH-090 (adr-0026): the terminal set is ``{done, cancelled}`` — a CANCELLED predecessor is terminal and no longer blocks the dependent (the actual SQL predicate lives in ``db.get_unfinished_dependencies`` / ``db.claim_next_job``). ``waiting_on`` is the list of predecessor work-item ids (e.g. ``["ORCH-010"]``) the task is still blocked by — used for the Telegram waiting-line / Plane visibility. never-raise: any error -> ``(True, [])`` (fail OPEN — consistent with the scheduler omitting the gate when the DB read fails; a transient error must not wedge an otherwise-claimable task). """ if task_id is None: return True, [] try: unfinished = db.get_unfinished_dependencies(task_id) except Exception: return True, [] if not unfinished: return True, [] waiting_on = [ str(d.get("work_item_id") or d.get("id")) for d in unfinished ] return False, waiting_on # --------------------------------------------------------------------------- # Cycle / deadlock detection (B-3) # --------------------------------------------------------------------------- def _build_adjacency(edges: list[tuple[int, int]]) -> dict[int, list[int]]: """Build a ``task_id -> [depends_on_task_id, ...]`` adjacency map. Edge direction follows the dependency: an edge (B, A) means "B depends on A", so we traverse from a dependent task towards its predecessors. A cycle in this graph is an unsatisfiable deadlock. """ adj: dict[int, list[int]] = {} for task_id, depends_on in edges: adj.setdefault(task_id, []).append(depends_on) return adj def _find_cycle_from(start: int, adj: dict[int, list[int]]) -> list[int] | None: """Iterative DFS from ``start``; return a cycle path if one is reachable. Returns the node sequence closing the cycle (e.g. ``[A, B, A]``) or None. Iterative (explicit stack) so a pathological deep graph cannot blow the Python recursion limit — relevant on the shared prod process. """ WHITE, GREY, BLACK = 0, 1, 2 color: dict[int, int] = {} parent: dict[int, int] = {} # stack of (node, is_exit): is_exit=True marks the post-visit (color BLACK). stack: list[tuple[int, bool]] = [(start, False)] while stack: node, is_exit = stack.pop() if is_exit: color[node] = BLACK continue if color.get(node, WHITE) != WHITE: continue color[node] = GREY stack.append((node, True)) for nxt in adj.get(node, []): c = color.get(nxt, WHITE) if c == GREY: # Back-edge -> cycle. Reconstruct path nxt..node via parent. path = [node] cur = node while cur != nxt and cur in parent: cur = parent[cur] path.append(cur) path.reverse() path.append(nxt) return path if c == WHITE: parent[nxt] = node stack.append((nxt, False)) return None def detect_cycle(task_id: int, edges: list[tuple[int, int]] | None = None) -> list[int] | None: """Detect a dependency cycle reachable from ``task_id``. Returns the cycle path (node sequence, first == last) or None when the graph reachable from ``task_id`` is acyclic. ``edges`` may be injected (unit tests); otherwise the full declared edge set is read from the DB. never-raise: any error -> None (do not falsely claim a deadlock on an error). """ if task_id is None: return None try: if edges is None: edges = db.get_dependency_edges() adj = _build_adjacency(edges) return _find_cycle_from(task_id, adj) except Exception: return None def find_any_cycle(edges: list[tuple[int, int]] | None = None) -> list[int] | None: """Backstop: detect ANY cycle in the whole declared graph. Used by the reconciler tick to surface a deadlock even when no specific task is being evaluated. Returns the first cycle found or None. never-raise -> None. """ try: if edges is None: edges = db.get_dependency_edges() adj = _build_adjacency(edges) for node in list(adj.keys()): cyc = _find_cycle_from(node, adj) if cyc: return cyc return None except Exception: return None def _work_item_id_for(task_id: int) -> str | None: """Best-effort ``tasks.work_item_id`` lookup for a task_id (never-raise).""" try: conn = db.get_db() try: row = conn.execute( "SELECT work_item_id FROM tasks WHERE id = ?", (task_id,) ).fetchone() finally: conn.close() return row[0] if row and row[0] else None except Exception: return None def handle_cycle(cycle: list[int]) -> bool: """Escalate a detected dependency cycle: Blocked + alert (B-3, AC-G1). For every task in the cycle, sets its Plane issue to Blocked (best-effort) and sends ONE Telegram alert naming the cycle, so a deadlock is visible instead of a silent forever-wait. Does NOT mutate job_deps / stages — the declaration is the human's to fix. never-raise: any notify/Plane error is swallowed; the worker/reconciler never crashes. Returns True if an alert was attempted, False on a no-op / error. """ if not cycle: return False try: # Map task ids -> work-item ids for a human-readable chain. labels: list[str] = [] seen: set[int] = set() for tid in cycle: wi = _work_item_id_for(tid) labels.append(wi or f"task#{tid}") if tid not in seen: seen.add(tid) chain = " -> ".join(labels) try: from . import notifications, plane_sync except Exception: return False # Blocked indication on each distinct issue in the cycle. for tid in seen: wi = _work_item_id_for(tid) if wi: try: plane_sync.set_issue_blocked(wi) except Exception: pass try: notifications.send_telegram( f"\U0001f6a8 ORCH-026: dependency DEADLOCK detected (cycle): {chain}. " f"Tasks set to Blocked — fix the blocked-by declaration." ) except Exception: pass logger.error("ORCH-026: dependency cycle detected: %s", chain) return True except Exception: return False # --------------------------------------------------------------------------- # Declaration (B-1) — db.add_dependency + immediate cycle escalation # --------------------------------------------------------------------------- def declare_dependency(task_id: int, depends_on_task_id: int) -> bool: """Declare "task_id (B) blocked-by depends_on_task_id (A)" and check for a cycle. Thin wrapper over ``db.add_dependency`` that, after a successful insert, runs ``detect_cycle`` from the new dependent — so a freshly-introduced deadlock is surfaced (Blocked + alert) at declaration time (best UX, ADR B-3) rather than only by the reconciler backstop. The edge is NOT rolled back on a cycle (the SQL gate already keeps the cyclic tasks un-claimable; the human fixes the declaration) — we make it VISIBLE. never-raise: any error -> False. Returns True iff a NEW edge row was inserted (idempotent re-declaration -> False, matching db.add_dependency). """ try: inserted = db.add_dependency(task_id, depends_on_task_id) # Always check for a cycle (even on a duplicate edge an existing cycle may # now be relevant), but only escalate when one is actually found. cyc = detect_cycle(task_id) if cyc: handle_cycle(cyc) return inserted except Exception: return False def ingest_plane_relations( task_id: int, issue_id: str, project_id: str ) -> int: """B-1 (plane/hybrid source): import Plane ``blocked-by`` relations into job_deps. Reads the issue's ``blocked_by`` predecessors from Plane, resolves each to a local ``tasks.id`` (intra-repo only, v1) and declares the edge. A predecessor not yet known locally (no task row) is SKIPPED — the scheduler can only gate on tasks it knows; a re-run after that task is created will pick it up. Active ONLY when ``task_deps_source`` is ``plane`` or ``hybrid`` (default ``db`` -> no Plane call on the hot creation path). never-raise (self-hosting): any error -> 0 edges, the pipeline start is never blocked by Plane. Returns the number of edges declared. """ source = (getattr(settings, "task_deps_source", "db") or "db").strip().lower() if source not in ("plane", "hybrid"): return 0 if not issue_id or not project_id: return 0 try: from . import plane_sync blocked_by = plane_sync.fetch_blocked_by_issue_ids(issue_id, project_id) except Exception: return 0 declared = 0 for pred_issue in blocked_by: try: pred = db.get_task_by_plane_id(str(pred_issue)) if not pred: continue if declare_dependency(task_id, pred["id"]): declared += 1 except Exception: continue return declared # --------------------------------------------------------------------------- # Observability (/queue snapshot, G-2) # --------------------------------------------------------------------------- def snapshot() -> dict: """Read-only summary of the dependency subsystem for GET /queue (G-2). Returns a dict (NOT a source of truth — pure observability): * ``enabled`` — task_deps_enabled flag; * ``source`` — task_deps_source (db|plane|hybrid); * ``edges`` — number of declared edges; * ``blocked_tasks`` — list of ``{task_id, work_item_id, waiting_on}`` for tasks with at least one un-done predecessor; * ``cycle`` — a detected cycle path (work-item labels) or None. never-raise: any error -> a minimal dict with the flags and empty data. """ enabled = bool(getattr(settings, "task_deps_enabled", False)) source = getattr(settings, "task_deps_source", "db") try: edges = db.get_dependency_edges() blocked: list[dict] = [] for task_id in {e[0] for e in edges}: ready, waiting_on = is_task_ready(task_id) if not ready: blocked.append({ "task_id": task_id, "work_item_id": _work_item_id_for(task_id), "waiting_on": waiting_on, }) cyc = find_any_cycle(edges) cycle_labels = None if cyc: cycle_labels = [(_work_item_id_for(t) or f"task#{t}") for t in cyc] return { "enabled": enabled, "source": source, "edges": len(edges), "blocked_tasks": blocked, "cycle": cycle_labels, } except Exception: return { "enabled": enabled, "source": source, "edges": 0, "blocked_tasks": [], "cycle": None, }