Files
orchestrator/src/task_deps.py
claude-bot ad5bd901e3
All checks were successful
CI / test (push) Successful in 33s
CI / test (pull_request) Successful in 32s
feat(cancel): STOP-status task cancellation + relaunch-hole close (ORCH-090)
Introduce the dedicated Plane STOP status as a single declarative task-cancel
mechanism: stop the active agent (graceful SIGTERM cascade), cancel all jobs
(terminal `cancelled`, never requeued), remove the worktree + delete the remote
feature branch (never main, never force-push), drive the task to the new
system-terminal state `cancelled` and tombstone the natural keys so a later
"To Analyse" re-creates it from scratch (docs artefacts preserved). STOP during a
critical merge/deploy window is deferred until the irreversible step finishes
honestly. Also closes the relaunch hole: handle_status_start relaunch is gated to
the `analysis` stage; the only pipeline-start entry point remains "To Analyse".

Cross-cutting (adr-0026): the "task terminal" predicate is widened {done} ->
{done, cancelled} in serial_gate / task_deps / stages sink + reaper/worker
requeue guards. STAGE_TRANSITIONS exit-gates / QG_CHECKS / check_* are unchanged
(`cancelled` is a sink, not a new edge). Additive, never-raise, restart-safe,
under kill-switch ORCH_STOP_STATUS_ENABLED (off -> zero regression).

New: src/cancel.py (leaf), src/gitea.py (delete_remote_branch), tasks columns
cancelled_at/cancel_requested_at, jobs status `cancelled`, GET /queue `stop` block.
Tests: tests/test_stop_status.py (TC-01..TC-14 + D7); full suite green (1345).
Docs updated in-PR (architecture README, CLAUDE.md, README.md, .env.example,
CHANGELOG). ADR-001 D4 refinement: plane_issue_id is tombstoned too (the lookup
ORs on it) — original UUID recoverable from the parseable suffix.

Refs: ORCH-090

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-09 21:01:57 +03:00

339 lines
13 KiB
Python

"""ORCH-026 (Level B): declarative task-dependency logic.
Leaf module — pure, unit-testable functions over the additive ``job_deps`` table
(see src/db.py / 08-data-requirements.md). It answers two questions the rest of
the pipeline asks:
* "is task B ready to run?" — every declared predecessor A reached
``tasks.stage = 'done'`` (``is_task_ready``). The scheduler gate in
``db.claim_next_job`` enforces the same predicate in SQL; this Python copy is
for the reconciler skip and for naming WHAT a task waits on (visibility).
* "is there a dependency deadlock?" — a directed cycle A->B->A (or longer) can
never be satisfied, so the tasks in it would wait forever. ``detect_cycle`` /
``find_any_cycle`` find one deterministically; ``handle_cycle`` escalates it
to Blocked + alert so the deadlock is visible instead of silent.
never-raise contract (AC-G1, self-hosting safety): EVERY public function
degrades conservatively on any error (DB/import) and NEVER propagates an
exception into the worker / reconciler / webhook. Readiness fails OPEN
(``True``) so a transient DB error cannot wedge the whole queue; cycle detection
fails CLOSED-safe (``None`` = "no cycle proven", do not block).
"""
from __future__ import annotations
import logging
from . import db
from .config import settings
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Readiness gate (B-2)
# ---------------------------------------------------------------------------
def is_task_ready(task_id: int) -> tuple[bool, list[str]]:
"""Return ``(ready, waiting_on)`` for a task.
``ready`` is True when the task has no declared dependency whose predecessor
is still un-done. ORCH-090 (adr-0026): the terminal set is
``{done, cancelled}`` — a CANCELLED predecessor is terminal and no longer
blocks the dependent (the actual SQL predicate lives in
``db.get_unfinished_dependencies`` / ``db.claim_next_job``). ``waiting_on`` is
the list of predecessor work-item ids (e.g. ``["ORCH-010"]``) the task is still
blocked by — used for the Telegram waiting-line / Plane visibility.
never-raise: any error -> ``(True, [])`` (fail OPEN — consistent with the
scheduler omitting the gate when the DB read fails; a transient error must
not wedge an otherwise-claimable task).
"""
if task_id is None:
return True, []
try:
unfinished = db.get_unfinished_dependencies(task_id)
except Exception:
return True, []
if not unfinished:
return True, []
waiting_on = [
str(d.get("work_item_id") or d.get("id"))
for d in unfinished
]
return False, waiting_on
# ---------------------------------------------------------------------------
# Cycle / deadlock detection (B-3)
# ---------------------------------------------------------------------------
def _build_adjacency(edges: list[tuple[int, int]]) -> dict[int, list[int]]:
"""Build a ``task_id -> [depends_on_task_id, ...]`` adjacency map.
Edge direction follows the dependency: an edge (B, A) means "B depends on A",
so we traverse from a dependent task towards its predecessors. A cycle in
this graph is an unsatisfiable deadlock.
"""
adj: dict[int, list[int]] = {}
for task_id, depends_on in edges:
adj.setdefault(task_id, []).append(depends_on)
return adj
def _find_cycle_from(start: int, adj: dict[int, list[int]]) -> list[int] | None:
"""Iterative DFS from ``start``; return a cycle path if one is reachable.
Returns the node sequence closing the cycle (e.g. ``[A, B, A]``) or None.
Iterative (explicit stack) so a pathological deep graph cannot blow the
Python recursion limit — relevant on the shared prod process.
"""
WHITE, GREY, BLACK = 0, 1, 2
color: dict[int, int] = {}
parent: dict[int, int] = {}
# stack of (node, is_exit): is_exit=True marks the post-visit (color BLACK).
stack: list[tuple[int, bool]] = [(start, False)]
while stack:
node, is_exit = stack.pop()
if is_exit:
color[node] = BLACK
continue
if color.get(node, WHITE) != WHITE:
continue
color[node] = GREY
stack.append((node, True))
for nxt in adj.get(node, []):
c = color.get(nxt, WHITE)
if c == GREY:
# Back-edge -> cycle. Reconstruct path nxt..node via parent.
path = [node]
cur = node
while cur != nxt and cur in parent:
cur = parent[cur]
path.append(cur)
path.reverse()
path.append(nxt)
return path
if c == WHITE:
parent[nxt] = node
stack.append((nxt, False))
return None
def detect_cycle(task_id: int, edges: list[tuple[int, int]] | None = None) -> list[int] | None:
"""Detect a dependency cycle reachable from ``task_id``.
Returns the cycle path (node sequence, first == last) or None when the graph
reachable from ``task_id`` is acyclic. ``edges`` may be injected (unit tests);
otherwise the full declared edge set is read from the DB.
never-raise: any error -> None (do not falsely claim a deadlock on an error).
"""
if task_id is None:
return None
try:
if edges is None:
edges = db.get_dependency_edges()
adj = _build_adjacency(edges)
return _find_cycle_from(task_id, adj)
except Exception:
return None
def find_any_cycle(edges: list[tuple[int, int]] | None = None) -> list[int] | None:
"""Backstop: detect ANY cycle in the whole declared graph.
Used by the reconciler tick to surface a deadlock even when no specific task
is being evaluated. Returns the first cycle found or None. never-raise -> None.
"""
try:
if edges is None:
edges = db.get_dependency_edges()
adj = _build_adjacency(edges)
for node in list(adj.keys()):
cyc = _find_cycle_from(node, adj)
if cyc:
return cyc
return None
except Exception:
return None
def _work_item_id_for(task_id: int) -> str | None:
"""Best-effort ``tasks.work_item_id`` lookup for a task_id (never-raise)."""
try:
conn = db.get_db()
try:
row = conn.execute(
"SELECT work_item_id FROM tasks WHERE id = ?", (task_id,)
).fetchone()
finally:
conn.close()
return row[0] if row and row[0] else None
except Exception:
return None
def handle_cycle(cycle: list[int]) -> bool:
"""Escalate a detected dependency cycle: Blocked + alert (B-3, AC-G1).
For every task in the cycle, sets its Plane issue to Blocked (best-effort)
and sends ONE Telegram alert naming the cycle, so a deadlock is visible
instead of a silent forever-wait. Does NOT mutate job_deps / stages — the
declaration is the human's to fix. never-raise: any notify/Plane error is
swallowed; the worker/reconciler never crashes. Returns True if an alert was
attempted, False on a no-op / error.
"""
if not cycle:
return False
try:
# Map task ids -> work-item ids for a human-readable chain.
labels: list[str] = []
seen: set[int] = set()
for tid in cycle:
wi = _work_item_id_for(tid)
labels.append(wi or f"task#{tid}")
if tid not in seen:
seen.add(tid)
chain = " -> ".join(labels)
try:
from . import notifications, plane_sync
except Exception:
return False
# Blocked indication on each distinct issue in the cycle.
for tid in seen:
wi = _work_item_id_for(tid)
if wi:
try:
plane_sync.set_issue_blocked(wi)
except Exception:
pass
try:
notifications.send_telegram(
f"\U0001f6a8 ORCH-026: dependency DEADLOCK detected (cycle): {chain}. "
f"Tasks set to Blocked — fix the blocked-by declaration."
)
except Exception:
pass
logger.error("ORCH-026: dependency cycle detected: %s", chain)
return True
except Exception:
return False
# ---------------------------------------------------------------------------
# Declaration (B-1) — db.add_dependency + immediate cycle escalation
# ---------------------------------------------------------------------------
def declare_dependency(task_id: int, depends_on_task_id: int) -> bool:
"""Declare "task_id (B) blocked-by depends_on_task_id (A)" and check for a cycle.
Thin wrapper over ``db.add_dependency`` that, after a successful insert, runs
``detect_cycle`` from the new dependent — so a freshly-introduced deadlock is
surfaced (Blocked + alert) at declaration time (best UX, ADR B-3) rather than
only by the reconciler backstop. The edge is NOT rolled back on a cycle (the
SQL gate already keeps the cyclic tasks un-claimable; the human fixes the
declaration) — we make it VISIBLE. never-raise: any error -> False.
Returns True iff a NEW edge row was inserted (idempotent re-declaration ->
False, matching db.add_dependency).
"""
try:
inserted = db.add_dependency(task_id, depends_on_task_id)
# Always check for a cycle (even on a duplicate edge an existing cycle may
# now be relevant), but only escalate when one is actually found.
cyc = detect_cycle(task_id)
if cyc:
handle_cycle(cyc)
return inserted
except Exception:
return False
def ingest_plane_relations(
task_id: int, issue_id: str, project_id: str
) -> int:
"""B-1 (plane/hybrid source): import Plane ``blocked-by`` relations into job_deps.
Reads the issue's ``blocked_by`` predecessors from Plane, resolves each to a
local ``tasks.id`` (intra-repo only, v1) and declares the edge. A predecessor
not yet known locally (no task row) is SKIPPED — the scheduler can only gate
on tasks it knows; a re-run after that task is created will pick it up.
Active ONLY when ``task_deps_source`` is ``plane`` or ``hybrid`` (default
``db`` -> no Plane call on the hot creation path). never-raise (self-hosting):
any error -> 0 edges, the pipeline start is never blocked by Plane. Returns
the number of edges declared.
"""
source = (getattr(settings, "task_deps_source", "db") or "db").strip().lower()
if source not in ("plane", "hybrid"):
return 0
if not issue_id or not project_id:
return 0
try:
from . import plane_sync
blocked_by = plane_sync.fetch_blocked_by_issue_ids(issue_id, project_id)
except Exception:
return 0
declared = 0
for pred_issue in blocked_by:
try:
pred = db.get_task_by_plane_id(str(pred_issue))
if not pred:
continue
if declare_dependency(task_id, pred["id"]):
declared += 1
except Exception:
continue
return declared
# ---------------------------------------------------------------------------
# Observability (/queue snapshot, G-2)
# ---------------------------------------------------------------------------
def snapshot() -> dict:
"""Read-only summary of the dependency subsystem for GET /queue (G-2).
Returns a dict (NOT a source of truth — pure observability):
* ``enabled`` — task_deps_enabled flag;
* ``source`` — task_deps_source (db|plane|hybrid);
* ``edges`` — number of declared edges;
* ``blocked_tasks`` — list of ``{task_id, work_item_id, waiting_on}`` for
tasks with at least one un-done predecessor;
* ``cycle`` — a detected cycle path (work-item labels) or None.
never-raise: any error -> a minimal dict with the flags and empty data.
"""
enabled = bool(getattr(settings, "task_deps_enabled", False))
source = getattr(settings, "task_deps_source", "db")
try:
edges = db.get_dependency_edges()
blocked: list[dict] = []
for task_id in {e[0] for e in edges}:
ready, waiting_on = is_task_ready(task_id)
if not ready:
blocked.append({
"task_id": task_id,
"work_item_id": _work_item_id_for(task_id),
"waiting_on": waiting_on,
})
cyc = find_any_cycle(edges)
cycle_labels = None
if cyc:
cycle_labels = [(_work_item_id_for(t) or f"task#{t}") for t in cyc]
return {
"enabled": enabled,
"source": source,
"edges": len(edges),
"blocked_tasks": blocked,
"cycle": cycle_labels,
}
except Exception:
return {
"enabled": enabled,
"source": source,
"edges": 0,
"blocked_tasks": [],
"cycle": None,
}