Files
orchestrator/src/task_deps.py
claude-bot a74379f657 feat(ORCH-026): task dependencies (B waits for A) + single-repo merge serialization
Level A — merge/deploy serialization within one repo: reuse the existing
ORCH-043/065 merge-lease (no new mechanism); the only new logic is an
unconditional pre-merge rebase in check_branch_mergeable — under the held
lease, auto_rebase_onto_main is ALWAYS called when premerge_rebase_always
(default True), not just when the branch is behind. No-op on an up-to-date
branch (rebase keeps HEAD, force-with-lease -> "Everything up-to-date", CI
not triggered). Kill-switch off -> ORCH-043 behaviour 1:1.

Level B — declarative task dependencies: additive job_deps table
(CREATE ... IF NOT EXISTS, no live-DB migration); claim_next_job gate
(NOT EXISTS) defers a job whose depends-on tasks are not yet 'done' without
occupying a max_concurrency slot; inert on empty job_deps -> zero regression.
New leaf src/task_deps.py (never-raise): is_task_ready (fail-open), DFS cycle
detection + Blocked/alert, declare/ingest_plane_relations (db source never
hits the network on the hot path), snapshot. Telegram waiting-line, /queue
observability, reconciler skip + cycle backstop, reaper untouched.

Invariants unchanged: STAGE_TRANSITIONS, QG_CHECKS registry (dep gate is a
claim_next_job врезка, not a registered QG), DB schema of existing tables,
HTTP endpoints; non-self repos remain a no-op on empty deps/scope.

Flags: ORCH_PREMERGE_REBASE_ALWAYS, ORCH_TASK_DEPS_ENABLED, ORCH_TASK_DEPS_SOURCE.
Docs: docs/architecture/README.md, CLAUDE.md, .env.example, CHANGELOG.md,
adr-0015. Tests: tests/test_orch026_*.py (64 tests); full suite 991 green.

Refs: ORCH-026

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-06-08 19:17:44 +03:00

336 lines
13 KiB
Python

"""ORCH-026 (Level B): declarative task-dependency logic.
Leaf module — pure, unit-testable functions over the additive ``job_deps`` table
(see src/db.py / 08-data-requirements.md). It answers two questions the rest of
the pipeline asks:
* "is task B ready to run?" — every declared predecessor A reached
``tasks.stage = 'done'`` (``is_task_ready``). The scheduler gate in
``db.claim_next_job`` enforces the same predicate in SQL; this Python copy is
for the reconciler skip and for naming WHAT a task waits on (visibility).
* "is there a dependency deadlock?" — a directed cycle A->B->A (or longer) can
never be satisfied, so the tasks in it would wait forever. ``detect_cycle`` /
``find_any_cycle`` find one deterministically; ``handle_cycle`` escalates it
to Blocked + alert so the deadlock is visible instead of silent.
never-raise contract (AC-G1, self-hosting safety): EVERY public function
degrades conservatively on any error (DB/import) and NEVER propagates an
exception into the worker / reconciler / webhook. Readiness fails OPEN
(``True``) so a transient DB error cannot wedge the whole queue; cycle detection
fails CLOSED-safe (``None`` = "no cycle proven", do not block).
"""
from __future__ import annotations
import logging
from . import db
from .config import settings
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Readiness gate (B-2)
# ---------------------------------------------------------------------------
def is_task_ready(task_id: int) -> tuple[bool, list[str]]:
"""Return ``(ready, waiting_on)`` for a task.
``ready`` is True when the task has no declared dependency whose predecessor
is still un-done (``tasks.stage != 'done'``). ``waiting_on`` is the list of
predecessor work-item ids (e.g. ``["ORCH-010"]``) the task is still blocked
by — used for the Telegram waiting-line / Plane visibility.
never-raise: any error -> ``(True, [])`` (fail OPEN — consistent with the
scheduler omitting the gate when the DB read fails; a transient error must
not wedge an otherwise-claimable task).
"""
if task_id is None:
return True, []
try:
unfinished = db.get_unfinished_dependencies(task_id)
except Exception:
return True, []
if not unfinished:
return True, []
waiting_on = [
str(d.get("work_item_id") or d.get("id"))
for d in unfinished
]
return False, waiting_on
# ---------------------------------------------------------------------------
# Cycle / deadlock detection (B-3)
# ---------------------------------------------------------------------------
def _build_adjacency(edges: list[tuple[int, int]]) -> dict[int, list[int]]:
"""Build a ``task_id -> [depends_on_task_id, ...]`` adjacency map.
Edge direction follows the dependency: an edge (B, A) means "B depends on A",
so we traverse from a dependent task towards its predecessors. A cycle in
this graph is an unsatisfiable deadlock.
"""
adj: dict[int, list[int]] = {}
for task_id, depends_on in edges:
adj.setdefault(task_id, []).append(depends_on)
return adj
def _find_cycle_from(start: int, adj: dict[int, list[int]]) -> list[int] | None:
"""Iterative DFS from ``start``; return a cycle path if one is reachable.
Returns the node sequence closing the cycle (e.g. ``[A, B, A]``) or None.
Iterative (explicit stack) so a pathological deep graph cannot blow the
Python recursion limit — relevant on the shared prod process.
"""
WHITE, GREY, BLACK = 0, 1, 2
color: dict[int, int] = {}
parent: dict[int, int] = {}
# stack of (node, is_exit): is_exit=True marks the post-visit (color BLACK).
stack: list[tuple[int, bool]] = [(start, False)]
while stack:
node, is_exit = stack.pop()
if is_exit:
color[node] = BLACK
continue
if color.get(node, WHITE) != WHITE:
continue
color[node] = GREY
stack.append((node, True))
for nxt in adj.get(node, []):
c = color.get(nxt, WHITE)
if c == GREY:
# Back-edge -> cycle. Reconstruct path nxt..node via parent.
path = [node]
cur = node
while cur != nxt and cur in parent:
cur = parent[cur]
path.append(cur)
path.reverse()
path.append(nxt)
return path
if c == WHITE:
parent[nxt] = node
stack.append((nxt, False))
return None
def detect_cycle(task_id: int, edges: list[tuple[int, int]] | None = None) -> list[int] | None:
"""Detect a dependency cycle reachable from ``task_id``.
Returns the cycle path (node sequence, first == last) or None when the graph
reachable from ``task_id`` is acyclic. ``edges`` may be injected (unit tests);
otherwise the full declared edge set is read from the DB.
never-raise: any error -> None (do not falsely claim a deadlock on an error).
"""
if task_id is None:
return None
try:
if edges is None:
edges = db.get_dependency_edges()
adj = _build_adjacency(edges)
return _find_cycle_from(task_id, adj)
except Exception:
return None
def find_any_cycle(edges: list[tuple[int, int]] | None = None) -> list[int] | None:
"""Backstop: detect ANY cycle in the whole declared graph.
Used by the reconciler tick to surface a deadlock even when no specific task
is being evaluated. Returns the first cycle found or None. never-raise -> None.
"""
try:
if edges is None:
edges = db.get_dependency_edges()
adj = _build_adjacency(edges)
for node in list(adj.keys()):
cyc = _find_cycle_from(node, adj)
if cyc:
return cyc
return None
except Exception:
return None
def _work_item_id_for(task_id: int) -> str | None:
"""Best-effort ``tasks.work_item_id`` lookup for a task_id (never-raise)."""
try:
conn = db.get_db()
try:
row = conn.execute(
"SELECT work_item_id FROM tasks WHERE id = ?", (task_id,)
).fetchone()
finally:
conn.close()
return row[0] if row and row[0] else None
except Exception:
return None
def handle_cycle(cycle: list[int]) -> bool:
"""Escalate a detected dependency cycle: Blocked + alert (B-3, AC-G1).
For every task in the cycle, sets its Plane issue to Blocked (best-effort)
and sends ONE Telegram alert naming the cycle, so a deadlock is visible
instead of a silent forever-wait. Does NOT mutate job_deps / stages — the
declaration is the human's to fix. never-raise: any notify/Plane error is
swallowed; the worker/reconciler never crashes. Returns True if an alert was
attempted, False on a no-op / error.
"""
if not cycle:
return False
try:
# Map task ids -> work-item ids for a human-readable chain.
labels: list[str] = []
seen: set[int] = set()
for tid in cycle:
wi = _work_item_id_for(tid)
labels.append(wi or f"task#{tid}")
if tid not in seen:
seen.add(tid)
chain = " -> ".join(labels)
try:
from . import notifications, plane_sync
except Exception:
return False
# Blocked indication on each distinct issue in the cycle.
for tid in seen:
wi = _work_item_id_for(tid)
if wi:
try:
plane_sync.set_issue_blocked(wi)
except Exception:
pass
try:
notifications.send_telegram(
f"\U0001f6a8 ORCH-026: dependency DEADLOCK detected (cycle): {chain}. "
f"Tasks set to Blocked — fix the blocked-by declaration."
)
except Exception:
pass
logger.error("ORCH-026: dependency cycle detected: %s", chain)
return True
except Exception:
return False
# ---------------------------------------------------------------------------
# Declaration (B-1) — db.add_dependency + immediate cycle escalation
# ---------------------------------------------------------------------------
def declare_dependency(task_id: int, depends_on_task_id: int) -> bool:
"""Declare "task_id (B) blocked-by depends_on_task_id (A)" and check for a cycle.
Thin wrapper over ``db.add_dependency`` that, after a successful insert, runs
``detect_cycle`` from the new dependent — so a freshly-introduced deadlock is
surfaced (Blocked + alert) at declaration time (best UX, ADR B-3) rather than
only by the reconciler backstop. The edge is NOT rolled back on a cycle (the
SQL gate already keeps the cyclic tasks un-claimable; the human fixes the
declaration) — we make it VISIBLE. never-raise: any error -> False.
Returns True iff a NEW edge row was inserted (idempotent re-declaration ->
False, matching db.add_dependency).
"""
try:
inserted = db.add_dependency(task_id, depends_on_task_id)
# Always check for a cycle (even on a duplicate edge an existing cycle may
# now be relevant), but only escalate when one is actually found.
cyc = detect_cycle(task_id)
if cyc:
handle_cycle(cyc)
return inserted
except Exception:
return False
def ingest_plane_relations(
task_id: int, issue_id: str, project_id: str
) -> int:
"""B-1 (plane/hybrid source): import Plane ``blocked-by`` relations into job_deps.
Reads the issue's ``blocked_by`` predecessors from Plane, resolves each to a
local ``tasks.id`` (intra-repo only, v1) and declares the edge. A predecessor
not yet known locally (no task row) is SKIPPED — the scheduler can only gate
on tasks it knows; a re-run after that task is created will pick it up.
Active ONLY when ``task_deps_source`` is ``plane`` or ``hybrid`` (default
``db`` -> no Plane call on the hot creation path). never-raise (self-hosting):
any error -> 0 edges, the pipeline start is never blocked by Plane. Returns
the number of edges declared.
"""
source = (getattr(settings, "task_deps_source", "db") or "db").strip().lower()
if source not in ("plane", "hybrid"):
return 0
if not issue_id or not project_id:
return 0
try:
from . import plane_sync
blocked_by = plane_sync.fetch_blocked_by_issue_ids(issue_id, project_id)
except Exception:
return 0
declared = 0
for pred_issue in blocked_by:
try:
pred = db.get_task_by_plane_id(str(pred_issue))
if not pred:
continue
if declare_dependency(task_id, pred["id"]):
declared += 1
except Exception:
continue
return declared
# ---------------------------------------------------------------------------
# Observability (/queue snapshot, G-2)
# ---------------------------------------------------------------------------
def snapshot() -> dict:
"""Read-only summary of the dependency subsystem for GET /queue (G-2).
Returns a dict (NOT a source of truth — pure observability):
* ``enabled`` — task_deps_enabled flag;
* ``source`` — task_deps_source (db|plane|hybrid);
* ``edges`` — number of declared edges;
* ``blocked_tasks`` — list of ``{task_id, work_item_id, waiting_on}`` for
tasks with at least one un-done predecessor;
* ``cycle`` — a detected cycle path (work-item labels) or None.
never-raise: any error -> a minimal dict with the flags and empty data.
"""
enabled = bool(getattr(settings, "task_deps_enabled", False))
source = getattr(settings, "task_deps_source", "db")
try:
edges = db.get_dependency_edges()
blocked: list[dict] = []
for task_id in {e[0] for e in edges}:
ready, waiting_on = is_task_ready(task_id)
if not ready:
blocked.append({
"task_id": task_id,
"work_item_id": _work_item_id_for(task_id),
"waiting_on": waiting_on,
})
cyc = find_any_cycle(edges)
cycle_labels = None
if cyc:
cycle_labels = [(_work_item_id_for(t) or f"task#{t}") for t in cyc]
return {
"enabled": enabled,
"source": source,
"edges": len(edges),
"blocked_tasks": blocked,
"cycle": cycle_labels,
}
except Exception:
return {
"enabled": enabled,
"source": source,
"edges": 0,
"blocked_tasks": [],
"cycle": None,
}