Introduce the dedicated Plane STOP status as a single declarative task-cancel
mechanism: stop the active agent (graceful SIGTERM cascade), cancel all jobs
(terminal `cancelled`, never requeued), remove the worktree + delete the remote
feature branch (never main, never force-push), drive the task to the new
system-terminal state `cancelled` and tombstone the natural keys so a later
"To Analyse" re-creates it from scratch (docs artefacts preserved). STOP during a
critical merge/deploy window is deferred until the irreversible step finishes
honestly. Also closes the relaunch hole: handle_status_start relaunch is gated to
the `analysis` stage; the only pipeline-start entry point remains "To Analyse".
Cross-cutting (adr-0026): the "task terminal" predicate is widened {done} ->
{done, cancelled} in serial_gate / task_deps / stages sink + reaper/worker
requeue guards. STAGE_TRANSITIONS exit-gates / QG_CHECKS / check_* are unchanged
(`cancelled` is a sink, not a new edge). Additive, never-raise, restart-safe,
under kill-switch ORCH_STOP_STATUS_ENABLED (off -> zero regression).
New: src/cancel.py (leaf), src/gitea.py (delete_remote_branch), tasks columns
cancelled_at/cancel_requested_at, jobs status `cancelled`, GET /queue `stop` block.
Tests: tests/test_stop_status.py (TC-01..TC-14 + D7); full suite green (1345).
Docs updated in-PR (architecture README, CLAUDE.md, README.md, .env.example,
CHANGELOG). ADR-001 D4 refinement: plane_issue_id is tombstoned too (the lookup
ORs on it) — original UUID recoverable from the parseable suffix.
Refs: ORCH-090
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
339 lines
13 KiB
Python
339 lines
13 KiB
Python
"""ORCH-026 (Level B): declarative task-dependency logic.
|
|
|
|
Leaf module — pure, unit-testable functions over the additive ``job_deps`` table
|
|
(see src/db.py / 08-data-requirements.md). It answers two questions the rest of
|
|
the pipeline asks:
|
|
|
|
* "is task B ready to run?" — every declared predecessor A reached
|
|
``tasks.stage = 'done'`` (``is_task_ready``). The scheduler gate in
|
|
``db.claim_next_job`` enforces the same predicate in SQL; this Python copy is
|
|
for the reconciler skip and for naming WHAT a task waits on (visibility).
|
|
* "is there a dependency deadlock?" — a directed cycle A->B->A (or longer) can
|
|
never be satisfied, so the tasks in it would wait forever. ``detect_cycle`` /
|
|
``find_any_cycle`` find one deterministically; ``handle_cycle`` escalates it
|
|
to Blocked + alert so the deadlock is visible instead of silent.
|
|
|
|
never-raise contract (AC-G1, self-hosting safety): EVERY public function
|
|
degrades conservatively on any error (DB/import) and NEVER propagates an
|
|
exception into the worker / reconciler / webhook. Readiness fails OPEN
|
|
(``True``) so a transient DB error cannot wedge the whole queue; cycle detection
|
|
fails CLOSED-safe (``None`` = "no cycle proven", do not block).
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import logging
|
|
|
|
from . import db
|
|
from .config import settings
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Readiness gate (B-2)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def is_task_ready(task_id: int) -> tuple[bool, list[str]]:
|
|
"""Return ``(ready, waiting_on)`` for a task.
|
|
|
|
``ready`` is True when the task has no declared dependency whose predecessor
|
|
is still un-done. ORCH-090 (adr-0026): the terminal set is
|
|
``{done, cancelled}`` — a CANCELLED predecessor is terminal and no longer
|
|
blocks the dependent (the actual SQL predicate lives in
|
|
``db.get_unfinished_dependencies`` / ``db.claim_next_job``). ``waiting_on`` is
|
|
the list of predecessor work-item ids (e.g. ``["ORCH-010"]``) the task is still
|
|
blocked by — used for the Telegram waiting-line / Plane visibility.
|
|
|
|
never-raise: any error -> ``(True, [])`` (fail OPEN — consistent with the
|
|
scheduler omitting the gate when the DB read fails; a transient error must
|
|
not wedge an otherwise-claimable task).
|
|
"""
|
|
if task_id is None:
|
|
return True, []
|
|
try:
|
|
unfinished = db.get_unfinished_dependencies(task_id)
|
|
except Exception:
|
|
return True, []
|
|
if not unfinished:
|
|
return True, []
|
|
waiting_on = [
|
|
str(d.get("work_item_id") or d.get("id"))
|
|
for d in unfinished
|
|
]
|
|
return False, waiting_on
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Cycle / deadlock detection (B-3)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def _build_adjacency(edges: list[tuple[int, int]]) -> dict[int, list[int]]:
|
|
"""Build a ``task_id -> [depends_on_task_id, ...]`` adjacency map.
|
|
|
|
Edge direction follows the dependency: an edge (B, A) means "B depends on A",
|
|
so we traverse from a dependent task towards its predecessors. A cycle in
|
|
this graph is an unsatisfiable deadlock.
|
|
"""
|
|
adj: dict[int, list[int]] = {}
|
|
for task_id, depends_on in edges:
|
|
adj.setdefault(task_id, []).append(depends_on)
|
|
return adj
|
|
|
|
|
|
def _find_cycle_from(start: int, adj: dict[int, list[int]]) -> list[int] | None:
|
|
"""Iterative DFS from ``start``; return a cycle path if one is reachable.
|
|
|
|
Returns the node sequence closing the cycle (e.g. ``[A, B, A]``) or None.
|
|
Iterative (explicit stack) so a pathological deep graph cannot blow the
|
|
Python recursion limit — relevant on the shared prod process.
|
|
"""
|
|
WHITE, GREY, BLACK = 0, 1, 2
|
|
color: dict[int, int] = {}
|
|
parent: dict[int, int] = {}
|
|
# stack of (node, is_exit): is_exit=True marks the post-visit (color BLACK).
|
|
stack: list[tuple[int, bool]] = [(start, False)]
|
|
while stack:
|
|
node, is_exit = stack.pop()
|
|
if is_exit:
|
|
color[node] = BLACK
|
|
continue
|
|
if color.get(node, WHITE) != WHITE:
|
|
continue
|
|
color[node] = GREY
|
|
stack.append((node, True))
|
|
for nxt in adj.get(node, []):
|
|
c = color.get(nxt, WHITE)
|
|
if c == GREY:
|
|
# Back-edge -> cycle. Reconstruct path nxt..node via parent.
|
|
path = [node]
|
|
cur = node
|
|
while cur != nxt and cur in parent:
|
|
cur = parent[cur]
|
|
path.append(cur)
|
|
path.reverse()
|
|
path.append(nxt)
|
|
return path
|
|
if c == WHITE:
|
|
parent[nxt] = node
|
|
stack.append((nxt, False))
|
|
return None
|
|
|
|
|
|
def detect_cycle(task_id: int, edges: list[tuple[int, int]] | None = None) -> list[int] | None:
|
|
"""Detect a dependency cycle reachable from ``task_id``.
|
|
|
|
Returns the cycle path (node sequence, first == last) or None when the graph
|
|
reachable from ``task_id`` is acyclic. ``edges`` may be injected (unit tests);
|
|
otherwise the full declared edge set is read from the DB.
|
|
|
|
never-raise: any error -> None (do not falsely claim a deadlock on an error).
|
|
"""
|
|
if task_id is None:
|
|
return None
|
|
try:
|
|
if edges is None:
|
|
edges = db.get_dependency_edges()
|
|
adj = _build_adjacency(edges)
|
|
return _find_cycle_from(task_id, adj)
|
|
except Exception:
|
|
return None
|
|
|
|
|
|
def find_any_cycle(edges: list[tuple[int, int]] | None = None) -> list[int] | None:
|
|
"""Backstop: detect ANY cycle in the whole declared graph.
|
|
|
|
Used by the reconciler tick to surface a deadlock even when no specific task
|
|
is being evaluated. Returns the first cycle found or None. never-raise -> None.
|
|
"""
|
|
try:
|
|
if edges is None:
|
|
edges = db.get_dependency_edges()
|
|
adj = _build_adjacency(edges)
|
|
for node in list(adj.keys()):
|
|
cyc = _find_cycle_from(node, adj)
|
|
if cyc:
|
|
return cyc
|
|
return None
|
|
except Exception:
|
|
return None
|
|
|
|
|
|
def _work_item_id_for(task_id: int) -> str | None:
|
|
"""Best-effort ``tasks.work_item_id`` lookup for a task_id (never-raise)."""
|
|
try:
|
|
conn = db.get_db()
|
|
try:
|
|
row = conn.execute(
|
|
"SELECT work_item_id FROM tasks WHERE id = ?", (task_id,)
|
|
).fetchone()
|
|
finally:
|
|
conn.close()
|
|
return row[0] if row and row[0] else None
|
|
except Exception:
|
|
return None
|
|
|
|
|
|
def handle_cycle(cycle: list[int]) -> bool:
|
|
"""Escalate a detected dependency cycle: Blocked + alert (B-3, AC-G1).
|
|
|
|
For every task in the cycle, sets its Plane issue to Blocked (best-effort)
|
|
and sends ONE Telegram alert naming the cycle, so a deadlock is visible
|
|
instead of a silent forever-wait. Does NOT mutate job_deps / stages — the
|
|
declaration is the human's to fix. never-raise: any notify/Plane error is
|
|
swallowed; the worker/reconciler never crashes. Returns True if an alert was
|
|
attempted, False on a no-op / error.
|
|
"""
|
|
if not cycle:
|
|
return False
|
|
try:
|
|
# Map task ids -> work-item ids for a human-readable chain.
|
|
labels: list[str] = []
|
|
seen: set[int] = set()
|
|
for tid in cycle:
|
|
wi = _work_item_id_for(tid)
|
|
labels.append(wi or f"task#{tid}")
|
|
if tid not in seen:
|
|
seen.add(tid)
|
|
chain = " -> ".join(labels)
|
|
try:
|
|
from . import notifications, plane_sync
|
|
except Exception:
|
|
return False
|
|
# Blocked indication on each distinct issue in the cycle.
|
|
for tid in seen:
|
|
wi = _work_item_id_for(tid)
|
|
if wi:
|
|
try:
|
|
plane_sync.set_issue_blocked(wi)
|
|
except Exception:
|
|
pass
|
|
try:
|
|
notifications.send_telegram(
|
|
f"\U0001f6a8 ORCH-026: dependency DEADLOCK detected (cycle): {chain}. "
|
|
f"Tasks set to Blocked — fix the blocked-by declaration."
|
|
)
|
|
except Exception:
|
|
pass
|
|
logger.error("ORCH-026: dependency cycle detected: %s", chain)
|
|
return True
|
|
except Exception:
|
|
return False
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Declaration (B-1) — db.add_dependency + immediate cycle escalation
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def declare_dependency(task_id: int, depends_on_task_id: int) -> bool:
|
|
"""Declare "task_id (B) blocked-by depends_on_task_id (A)" and check for a cycle.
|
|
|
|
Thin wrapper over ``db.add_dependency`` that, after a successful insert, runs
|
|
``detect_cycle`` from the new dependent — so a freshly-introduced deadlock is
|
|
surfaced (Blocked + alert) at declaration time (best UX, ADR B-3) rather than
|
|
only by the reconciler backstop. The edge is NOT rolled back on a cycle (the
|
|
SQL gate already keeps the cyclic tasks un-claimable; the human fixes the
|
|
declaration) — we make it VISIBLE. never-raise: any error -> False.
|
|
|
|
Returns True iff a NEW edge row was inserted (idempotent re-declaration ->
|
|
False, matching db.add_dependency).
|
|
"""
|
|
try:
|
|
inserted = db.add_dependency(task_id, depends_on_task_id)
|
|
# Always check for a cycle (even on a duplicate edge an existing cycle may
|
|
# now be relevant), but only escalate when one is actually found.
|
|
cyc = detect_cycle(task_id)
|
|
if cyc:
|
|
handle_cycle(cyc)
|
|
return inserted
|
|
except Exception:
|
|
return False
|
|
|
|
|
|
def ingest_plane_relations(
|
|
task_id: int, issue_id: str, project_id: str
|
|
) -> int:
|
|
"""B-1 (plane/hybrid source): import Plane ``blocked-by`` relations into job_deps.
|
|
|
|
Reads the issue's ``blocked_by`` predecessors from Plane, resolves each to a
|
|
local ``tasks.id`` (intra-repo only, v1) and declares the edge. A predecessor
|
|
not yet known locally (no task row) is SKIPPED — the scheduler can only gate
|
|
on tasks it knows; a re-run after that task is created will pick it up.
|
|
|
|
Active ONLY when ``task_deps_source`` is ``plane`` or ``hybrid`` (default
|
|
``db`` -> no Plane call on the hot creation path). never-raise (self-hosting):
|
|
any error -> 0 edges, the pipeline start is never blocked by Plane. Returns
|
|
the number of edges declared.
|
|
"""
|
|
source = (getattr(settings, "task_deps_source", "db") or "db").strip().lower()
|
|
if source not in ("plane", "hybrid"):
|
|
return 0
|
|
if not issue_id or not project_id:
|
|
return 0
|
|
try:
|
|
from . import plane_sync
|
|
blocked_by = plane_sync.fetch_blocked_by_issue_ids(issue_id, project_id)
|
|
except Exception:
|
|
return 0
|
|
declared = 0
|
|
for pred_issue in blocked_by:
|
|
try:
|
|
pred = db.get_task_by_plane_id(str(pred_issue))
|
|
if not pred:
|
|
continue
|
|
if declare_dependency(task_id, pred["id"]):
|
|
declared += 1
|
|
except Exception:
|
|
continue
|
|
return declared
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Observability (/queue snapshot, G-2)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def snapshot() -> dict:
|
|
"""Read-only summary of the dependency subsystem for GET /queue (G-2).
|
|
|
|
Returns a dict (NOT a source of truth — pure observability):
|
|
* ``enabled`` — task_deps_enabled flag;
|
|
* ``source`` — task_deps_source (db|plane|hybrid);
|
|
* ``edges`` — number of declared edges;
|
|
* ``blocked_tasks`` — list of ``{task_id, work_item_id, waiting_on}`` for
|
|
tasks with at least one un-done predecessor;
|
|
* ``cycle`` — a detected cycle path (work-item labels) or None.
|
|
|
|
never-raise: any error -> a minimal dict with the flags and empty data.
|
|
"""
|
|
enabled = bool(getattr(settings, "task_deps_enabled", False))
|
|
source = getattr(settings, "task_deps_source", "db")
|
|
try:
|
|
edges = db.get_dependency_edges()
|
|
blocked: list[dict] = []
|
|
for task_id in {e[0] for e in edges}:
|
|
ready, waiting_on = is_task_ready(task_id)
|
|
if not ready:
|
|
blocked.append({
|
|
"task_id": task_id,
|
|
"work_item_id": _work_item_id_for(task_id),
|
|
"waiting_on": waiting_on,
|
|
})
|
|
cyc = find_any_cycle(edges)
|
|
cycle_labels = None
|
|
if cyc:
|
|
cycle_labels = [(_work_item_id_for(t) or f"task#{t}") for t in cyc]
|
|
return {
|
|
"enabled": enabled,
|
|
"source": source,
|
|
"edges": len(edges),
|
|
"blocked_tasks": blocked,
|
|
"cycle": cycle_labels,
|
|
}
|
|
except Exception:
|
|
return {
|
|
"enabled": enabled,
|
|
"source": source,
|
|
"edges": 0,
|
|
"blocked_tasks": [],
|
|
"cycle": None,
|
|
}
|