feat(ORCH-026): task dependencies (B waits for A) + single-repo merge serialization

Level A — merge/deploy serialization within one repo: reuse the existing
ORCH-043/065 merge-lease (no new mechanism); the only new logic is an
unconditional pre-merge rebase in check_branch_mergeable — under the held
lease, auto_rebase_onto_main is ALWAYS called when premerge_rebase_always
(default True), not just when the branch is behind. No-op on an up-to-date
branch (rebase keeps HEAD, force-with-lease -> "Everything up-to-date", CI
not triggered). Kill-switch off -> ORCH-043 behaviour 1:1.

Level B — declarative task dependencies: additive job_deps table
(CREATE ... IF NOT EXISTS, no live-DB migration); claim_next_job gate
(NOT EXISTS) defers a job whose depends-on tasks are not yet 'done' without
occupying a max_concurrency slot; inert on empty job_deps -> zero regression.
New leaf src/task_deps.py (never-raise): is_task_ready (fail-open), DFS cycle
detection + Blocked/alert, declare/ingest_plane_relations (db source never
hits the network on the hot path), snapshot. Telegram waiting-line, /queue
observability, reconciler skip + cycle backstop, reaper untouched.

Invariants unchanged: STAGE_TRANSITIONS, QG_CHECKS registry (dep gate is a
claim_next_job врезка, not a registered QG), DB schema of existing tables,
HTTP endpoints; non-self repos remain a no-op on empty deps/scope.

Flags: ORCH_PREMERGE_REBASE_ALWAYS, ORCH_TASK_DEPS_ENABLED, ORCH_TASK_DEPS_SOURCE.
Docs: docs/architecture/README.md, CLAUDE.md, .env.example, CHANGELOG.md,
adr-0015. Tests: tests/test_orch026_*.py (64 tests); full suite 991 green.

Refs: ORCH-026

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
2026-06-08 19:06:22 +03:00
committed by stream
parent 9019e12d98
commit a74379f657
24 changed files with 1686 additions and 2 deletions

View File

@@ -396,6 +396,37 @@ class Settings(BaseSettings):
merge_pr_timeout_s: int = 60
merge_verify_timeout_s: int = 60
# ORCH-026: intra-repo merge serialisation (Level A) + declarative task
# dependencies (Level B). Level A reuses the ORCH-043/065 merge-lease window
# (no new mechanism) — the merge-lease already serialises "merge -> main-updated"
# per repo; the ONLY new behaviour is an unconditional pre-merge rebase. Level B
# adds a new ADDITIVE job_deps table + a NOT EXISTS gate in claim_next_job. Both
# features are inert without data (no applicable repo / no declared deps) ->
# zero regression for enduro-trails.
# premerge_rebase_always -> Level A (A-2): when True, check_branch_mergeable
# ALWAYS rebases the task branch onto the CURRENT
# origin/main UNDER the merge-lease (not only when
# branch_is_behind_main) — a deterministic anti-phantom
# that does not depend on the ancestor check's precision.
# auto_rebase_onto_main is a cheap no-op on an already
# up-to-date branch (rc 0, push up-to-date, CI not
# retriggered). Scope = merge_gate_repos (empty ->
# self-hosting). Kill-switch (False -> exactly the
# ORCH-043 behaviour: rebase only when behind). Env
# ORCH_PREMERGE_REBASE_ALWAYS.
# task_deps_enabled -> Level B (B-2): global kill-switch for the scheduler
# dependency gate. False -> claim_next_job is 1:1 as
# ORCH-1 (the NOT EXISTS clause is omitted). Inert when
# job_deps is empty. Env ORCH_TASK_DEPS_ENABLED.
# task_deps_source -> declaration source: db|plane|hybrid (default db).
# The scheduler ALWAYS reads the DB cache (offline-safe
# hot path); plane/hybrid additionally ingest Plane
# `blocked-by` relations into job_deps at task creation.
# Env ORCH_TASK_DEPS_SOURCE.
premerge_rebase_always: bool = True
task_deps_enabled: bool = True
task_deps_source: str = "db"
# ORCH-073 (ADR-001 Р-4): main-integrity regression guard. After the merge-verify
# under-gate confirms the deployed SHA is an ancestor of origin/main (FR-1), a
# secondary deterministic (no-LLM) guard checks that a declarative set of markers

130
src/db.py
View File

@@ -123,6 +123,24 @@ def init_db():
# tracker can show "твоё время" without recomputing from activity history.
_ensure_column(conn, "tasks", "brd_review_started_at", "TEXT")
_ensure_column(conn, "tasks", "brd_review_ended_at", "TEXT")
# ORCH-026 (Level B): declarative task dependencies. job_deps stores the
# directed edge "task_id (B) is blocked-by depends_on_task_id (A)". The
# scheduler gate in claim_next_job keeps B queued until every A reaches
# tasks.stage='done'. Purely ADDITIVE (CREATE TABLE/INDEX IF NOT EXISTS, no
# change to jobs/tasks/agent_runs/events columns) -> idempotent and safe on
# the live shared prod DB (enduro-trails data untouched). The logical FK on
# tasks.id is intentional (no REFERENCES, mirrors jobs.task_id) so the
# migration cannot fail on a pre-existing DB. See 08-data-requirements.md.
conn.executescript("""
CREATE TABLE IF NOT EXISTS job_deps (
task_id INTEGER NOT NULL,
depends_on_task_id INTEGER NOT NULL,
created_at TEXT DEFAULT (datetime('now')),
PRIMARY KEY (task_id, depends_on_task_id)
);
CREATE INDEX IF NOT EXISTS idx_job_deps_task ON job_deps(task_id);
CREATE INDEX IF NOT EXISTS idx_job_deps_depends ON job_deps(depends_on_task_id);
""")
conn.commit()
conn.close()
@@ -466,12 +484,28 @@ def claim_next_job() -> dict | None:
so the SELECT+UPDATE pair is consistent. Returns the claimed job dict or None
when the queue is empty.
"""
# ORCH-026 (Level B, B-2): scheduler dependency gate. When task_deps_enabled
# is on, a job whose task has an UNFINISHED declared dependency
# (job_deps.depends_on_task_id -> a task with stage != 'done') is NOT
# claimable -> it stays 'queued' without occupying a max_concurrency slot.
# Jobs with a NULL task_id (no task) or with no job_deps rows are unaffected
# (NOT EXISTS is True). Kill-switch off -> the clause is omitted -> 1:1 the
# ORCH-1 query. The gate reads only the DB (offline-safe hot path).
dep_gate = ""
if getattr(settings, "task_deps_enabled", False):
dep_gate = (
"AND NOT EXISTS ("
" SELECT 1 FROM job_deps d JOIN tasks t ON t.id = d.depends_on_task_id "
" WHERE d.task_id = jobs.task_id AND t.stage != 'done'"
") "
)
conn = get_db()
try:
while True:
row = conn.execute(
"SELECT id FROM jobs WHERE status='queued' "
"AND (available_at IS NULL OR available_at <= datetime('now')) "
f"{dep_gate}"
"ORDER BY id LIMIT 1"
).fetchone()
if not row:
@@ -705,6 +739,102 @@ def recent_jobs(limit: int = 10) -> list[dict]:
return [dict(r) for r in rows]
# ---------------------------------------------------------------------------
# ORCH-026 (Level B): declarative task-dependency helpers
# ---------------------------------------------------------------------------
def add_dependency(task_id: int, depends_on_task_id: int) -> bool:
"""Declare that task ``task_id`` (B) is blocked-by ``depends_on_task_id`` (A).
Idempotent INSERT OR IGNORE against the job_deps PK (re-declaring the same
edge is a no-op). A self-edge (task depends on itself) is rejected — it would
deadlock the task forever and can never be satisfied. never-raise
(self-hosting safety, AC-G1): any DB error -> returns False, the caller must
not crash the webhook / worker. Returns True iff a NEW edge row was inserted.
"""
if task_id is None or depends_on_task_id is None:
return False
if task_id == depends_on_task_id:
return False
try:
conn = get_db()
try:
cur = conn.execute(
"INSERT OR IGNORE INTO job_deps (task_id, depends_on_task_id) "
"VALUES (?, ?)",
(task_id, depends_on_task_id),
)
conn.commit()
return cur.rowcount == 1
finally:
conn.close()
except Exception:
return False
def get_dependencies(task_id: int) -> list[int]:
"""Return the list of depends_on_task_id (A) that ``task_id`` (B) waits for.
never-raise: any DB error -> [] (conservative: caller treats the task as
having no declared dependency rather than crashing).
"""
try:
conn = get_db()
try:
rows = conn.execute(
"SELECT depends_on_task_id FROM job_deps WHERE task_id = ?",
(task_id,),
).fetchall()
finally:
conn.close()
return [r[0] for r in rows]
except Exception:
return []
def get_dependency_edges() -> list[tuple[int, int]]:
"""Return ALL declared edges as ``(task_id, depends_on_task_id)`` tuples.
Used by the cycle detector (DFS over the whole declared graph) and the
/queue snapshot. never-raise -> [] on any DB error.
"""
try:
conn = get_db()
try:
rows = conn.execute(
"SELECT task_id, depends_on_task_id FROM job_deps"
).fetchall()
finally:
conn.close()
return [(r[0], r[1]) for r in rows]
except Exception:
return []
def get_unfinished_dependencies(task_id: int) -> list[dict]:
"""Return the UNFINISHED dependencies of ``task_id`` (A's not yet 'done').
Each dict carries the predecessor's ``id``, ``work_item_id`` and ``stage``
so the readiness gate / Telegram waiting-line can name what B is waiting for.
never-raise -> [] on any DB error (treated as "ready", consistent with the
scheduler omitting the gate on failure).
"""
try:
conn = get_db()
try:
rows = conn.execute(
"SELECT t.id AS id, t.work_item_id AS work_item_id, t.stage AS stage "
"FROM job_deps d JOIN tasks t ON t.id = d.depends_on_task_id "
"WHERE d.task_id = ? AND t.stage != 'done'",
(task_id,),
).fetchall()
finally:
conn.close()
return [dict(r) for r in rows]
except Exception:
return []
# ---------------------------------------------------------------------------
# ORCH-1b (resilience): transient backoff helpers
# ---------------------------------------------------------------------------

View File

@@ -148,6 +148,7 @@ async def queue():
from .job_reaper import reaper
from . import post_deploy
from . import merge_gate
from . import task_deps
return {
"counts": job_status_counts(),
"max_concurrency": worker.max_concurrency,
@@ -157,5 +158,8 @@ async def queue():
"reaper": reaper.status(),
"post_deploy": post_deploy.status(),
"merge_verify": merge_gate.merge_verify_status(),
# ORCH-026 (G-2): declarative task-dependency observability (read-only,
# NOT a source of truth) — declared edges, blocked tasks, detected cycle.
"task_deps": task_deps.snapshot(),
"recent": recent_jobs(10),
}

View File

@@ -380,6 +380,22 @@ def render_task_tracker(task_id: int) -> str:
status_line = f"\U0001f4cd {status_label}"
lines = [header, status_line, bar]
# ORCH-026 (B-4): waiting-line for a task blocked by an unfinished declared
# dependency. Shows WHAT the task is waiting on ("⏳ ждёт ORCH-NNN"),
# so the single tracker card (invariant preserved) makes the wait visible.
# Never breaks the render: any error -> no waiting-line.
if not done:
try:
from . import task_deps
from .config import settings as _settings
if getattr(_settings, "task_deps_enabled", False):
ready, waiting_on = task_deps.is_task_ready(task_id)
if not ready and waiting_on:
waits = ", ".join(link_for(w) for w in waiting_on)
lines.append(f"⏳ ждёт {waits}")
except Exception:
pass
def _stage_line(label, run):
usage = {
"input_tokens": run["input_tokens"],

View File

@@ -433,6 +433,72 @@ def fetch_issue_state(issue_id: str, project_id: str, timeout: int = 10) -> str
return None
def fetch_blocked_by_issue_ids(issue_id: str, project_id: str, timeout: int = 10) -> list[str]:
"""ORCH-026 (B-1): list the Plane issue UUIDs that ``issue_id`` is BLOCKED-BY.
Reads the Plane issue-relation endpoint and returns the related issue UUIDs
declared as ``blocked_by`` (i.e. the predecessors A that this task B waits
for). Plane's relation payload shape has varied across versions, so the parse
is defensive: it accepts either a grouped object (``{"blocked_by": [...]}``)
or a flat list of ``{"relation_type": ..., "related_issue": ...}`` rows, and
pulls a uuid from ``related_issue`` / ``issue`` / ``id`` (bare uuid or nested
``{"id": ...}``).
never-raise (AC-G1, self-hosting): a Plane outage / non-2xx / unexpected
shape -> ``[]`` (no edge declared), so the ingestion degrades conservatively
and the pipeline never stalls on the network.
"""
if not issue_id or not project_id:
return []
url = (
f"{PLANE_BASE}/workspaces/{WORKSPACE}/projects/{project_id}"
f"/issues/{issue_id}/issue-relation/"
)
try:
resp = httpx.get(url, headers=PLANE_HEADERS, timeout=timeout)
resp.raise_for_status()
body = resp.json()
except Exception as e:
logger.warning(f"fetch_blocked_by_issue_ids failed for {issue_id}: {e}")
return []
def _uuid_of(row) -> str | None:
if isinstance(row, str):
return row
if isinstance(row, dict):
for key in ("related_issue", "issue", "id"):
v = row.get(key)
if isinstance(v, dict):
v = v.get("id")
if v:
return str(v)
return None
out: list[str] = []
try:
rows = []
if isinstance(body, dict):
# Grouped shape: {"blocked_by": [...], "blocking": [...], ...}
if "blocked_by" in body and isinstance(body["blocked_by"], list):
rows = body["blocked_by"]
else:
# Flat shape nested under common envelope keys.
rows = body.get("results") or body.get("relations") or []
elif isinstance(body, list):
rows = body
for row in rows:
# In the flat shape, keep only blocked_by rows.
if isinstance(row, dict) and row.get("relation_type") not in (None, "blocked_by"):
continue
uid = _uuid_of(row)
if uid and uid != issue_id:
out.append(uid)
except Exception as e:
logger.warning(f"fetch_blocked_by_issue_ids parse error for {issue_id}: {e}")
return []
return out
import re as _re

View File

@@ -673,8 +673,19 @@ def check_branch_mergeable(repo: str, work_item_id: str, branch: str) -> tuple[b
return False, reason
try:
# ORCH-026 (Level A, A-2): proactive pre-merge rebase. When
# premerge_rebase_always is on, ALWAYS rebase onto the CURRENT
# origin/main under the held lease — even when branch_is_behind_main
# says "not behind". The ancestor check can miss a divergence
# (squash/force-push history, ORCH-073 phantom-merge class), so an
# unconditional rebase is a deterministic anti-phantom: it guarantees
# B carries A's code before merge. auto_rebase_onto_main is a cheap
# no-op on an already up-to-date branch (rc 0, push up-to-date, CI not
# retriggered). Kill-switch off -> 1:1 the ORCH-043 short-circuit
# below (rebase only when behind).
always = bool(getattr(settings, "premerge_rebase_always", False))
# Double-check under the lease: another task may have just merged.
if not merge_gate.branch_is_behind_main(repo, branch):
if not always and not merge_gate.branch_is_behind_main(repo, branch):
logger.info("check_branch_mergeable: %s up-to-date with main", branch)
return True, "branch up-to-date with main"

View File

@@ -69,6 +69,7 @@ from .plane_sync import (
from .webhooks.plane import handle_status_start, handle_verdict
from .notifications import send_telegram, link_for
from . import projects
from . import task_deps
logger = logging.getLogger("orchestrator.reconciler")
@@ -165,6 +166,16 @@ class Reconciler:
f"reconciler F-1: task {task.get('id')} "
f"(stage={task.get('stage')}) failed: {e}"
)
# ORCH-026 (B-3) backstop: surface ANY dependency deadlock in the declared
# graph, even one whose tasks are not individually evaluated above (e.g. no
# active queued job). One alert per cycle; never-raise.
if settings.task_deps_enabled:
try:
cyc = task_deps.find_any_cycle()
if cyc:
task_deps.handle_cycle(cyc)
except Exception as e: # noqa: BLE001 - never break the sweep
logger.error(f"reconciler F-1: cycle backstop failed: {e}")
def _reconcile_gate_task(self, task: dict) -> None:
task_id = task["id"]
@@ -194,6 +205,18 @@ class Reconciler:
# Networked; runs after Guard 1 so escalated tasks never hit Plane.
if self._is_blocked_or_needs_input(task):
return
# ORCH-026 Guard 3 (B-5): a task blocked by an unfinished declared
# dependency is legitimately waiting, NOT stuck -> F-1 must not advance it
# past its depends-on (mirrors the Blocked/Needs-Input skip). Local DB,
# never-raise (is_task_ready fails OPEN). If the wait is actually a
# dependency DEADLOCK (cycle), surface it (Blocked + alert) once.
if settings.task_deps_enabled:
ready, _waiting = task_deps.is_task_ready(task_id)
if not ready:
cyc = task_deps.detect_cycle(task_id)
if cyc:
task_deps.handle_cycle(cyc)
return
result = advance_if_gate_passed(
task_id,
stage,

335
src/task_deps.py Normal file
View File

@@ -0,0 +1,335 @@
"""ORCH-026 (Level B): declarative task-dependency logic.
Leaf module — pure, unit-testable functions over the additive ``job_deps`` table
(see src/db.py / 08-data-requirements.md). It answers two questions the rest of
the pipeline asks:
* "is task B ready to run?" — every declared predecessor A reached
``tasks.stage = 'done'`` (``is_task_ready``). The scheduler gate in
``db.claim_next_job`` enforces the same predicate in SQL; this Python copy is
for the reconciler skip and for naming WHAT a task waits on (visibility).
* "is there a dependency deadlock?" — a directed cycle A->B->A (or longer) can
never be satisfied, so the tasks in it would wait forever. ``detect_cycle`` /
``find_any_cycle`` find one deterministically; ``handle_cycle`` escalates it
to Blocked + alert so the deadlock is visible instead of silent.
never-raise contract (AC-G1, self-hosting safety): EVERY public function
degrades conservatively on any error (DB/import) and NEVER propagates an
exception into the worker / reconciler / webhook. Readiness fails OPEN
(``True``) so a transient DB error cannot wedge the whole queue; cycle detection
fails CLOSED-safe (``None`` = "no cycle proven", do not block).
"""
from __future__ import annotations
import logging
from . import db
from .config import settings
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Readiness gate (B-2)
# ---------------------------------------------------------------------------
def is_task_ready(task_id: int) -> tuple[bool, list[str]]:
"""Return ``(ready, waiting_on)`` for a task.
``ready`` is True when the task has no declared dependency whose predecessor
is still un-done (``tasks.stage != 'done'``). ``waiting_on`` is the list of
predecessor work-item ids (e.g. ``["ORCH-010"]``) the task is still blocked
by — used for the Telegram waiting-line / Plane visibility.
never-raise: any error -> ``(True, [])`` (fail OPEN — consistent with the
scheduler omitting the gate when the DB read fails; a transient error must
not wedge an otherwise-claimable task).
"""
if task_id is None:
return True, []
try:
unfinished = db.get_unfinished_dependencies(task_id)
except Exception:
return True, []
if not unfinished:
return True, []
waiting_on = [
str(d.get("work_item_id") or d.get("id"))
for d in unfinished
]
return False, waiting_on
# ---------------------------------------------------------------------------
# Cycle / deadlock detection (B-3)
# ---------------------------------------------------------------------------
def _build_adjacency(edges: list[tuple[int, int]]) -> dict[int, list[int]]:
"""Build a ``task_id -> [depends_on_task_id, ...]`` adjacency map.
Edge direction follows the dependency: an edge (B, A) means "B depends on A",
so we traverse from a dependent task towards its predecessors. A cycle in
this graph is an unsatisfiable deadlock.
"""
adj: dict[int, list[int]] = {}
for task_id, depends_on in edges:
adj.setdefault(task_id, []).append(depends_on)
return adj
def _find_cycle_from(start: int, adj: dict[int, list[int]]) -> list[int] | None:
"""Iterative DFS from ``start``; return a cycle path if one is reachable.
Returns the node sequence closing the cycle (e.g. ``[A, B, A]``) or None.
Iterative (explicit stack) so a pathological deep graph cannot blow the
Python recursion limit — relevant on the shared prod process.
"""
WHITE, GREY, BLACK = 0, 1, 2
color: dict[int, int] = {}
parent: dict[int, int] = {}
# stack of (node, is_exit): is_exit=True marks the post-visit (color BLACK).
stack: list[tuple[int, bool]] = [(start, False)]
while stack:
node, is_exit = stack.pop()
if is_exit:
color[node] = BLACK
continue
if color.get(node, WHITE) != WHITE:
continue
color[node] = GREY
stack.append((node, True))
for nxt in adj.get(node, []):
c = color.get(nxt, WHITE)
if c == GREY:
# Back-edge -> cycle. Reconstruct path nxt..node via parent.
path = [node]
cur = node
while cur != nxt and cur in parent:
cur = parent[cur]
path.append(cur)
path.reverse()
path.append(nxt)
return path
if c == WHITE:
parent[nxt] = node
stack.append((nxt, False))
return None
def detect_cycle(task_id: int, edges: list[tuple[int, int]] | None = None) -> list[int] | None:
"""Detect a dependency cycle reachable from ``task_id``.
Returns the cycle path (node sequence, first == last) or None when the graph
reachable from ``task_id`` is acyclic. ``edges`` may be injected (unit tests);
otherwise the full declared edge set is read from the DB.
never-raise: any error -> None (do not falsely claim a deadlock on an error).
"""
if task_id is None:
return None
try:
if edges is None:
edges = db.get_dependency_edges()
adj = _build_adjacency(edges)
return _find_cycle_from(task_id, adj)
except Exception:
return None
def find_any_cycle(edges: list[tuple[int, int]] | None = None) -> list[int] | None:
"""Backstop: detect ANY cycle in the whole declared graph.
Used by the reconciler tick to surface a deadlock even when no specific task
is being evaluated. Returns the first cycle found or None. never-raise -> None.
"""
try:
if edges is None:
edges = db.get_dependency_edges()
adj = _build_adjacency(edges)
for node in list(adj.keys()):
cyc = _find_cycle_from(node, adj)
if cyc:
return cyc
return None
except Exception:
return None
def _work_item_id_for(task_id: int) -> str | None:
"""Best-effort ``tasks.work_item_id`` lookup for a task_id (never-raise)."""
try:
conn = db.get_db()
try:
row = conn.execute(
"SELECT work_item_id FROM tasks WHERE id = ?", (task_id,)
).fetchone()
finally:
conn.close()
return row[0] if row and row[0] else None
except Exception:
return None
def handle_cycle(cycle: list[int]) -> bool:
"""Escalate a detected dependency cycle: Blocked + alert (B-3, AC-G1).
For every task in the cycle, sets its Plane issue to Blocked (best-effort)
and sends ONE Telegram alert naming the cycle, so a deadlock is visible
instead of a silent forever-wait. Does NOT mutate job_deps / stages — the
declaration is the human's to fix. never-raise: any notify/Plane error is
swallowed; the worker/reconciler never crashes. Returns True if an alert was
attempted, False on a no-op / error.
"""
if not cycle:
return False
try:
# Map task ids -> work-item ids for a human-readable chain.
labels: list[str] = []
seen: set[int] = set()
for tid in cycle:
wi = _work_item_id_for(tid)
labels.append(wi or f"task#{tid}")
if tid not in seen:
seen.add(tid)
chain = " -> ".join(labels)
try:
from . import notifications, plane_sync
except Exception:
return False
# Blocked indication on each distinct issue in the cycle.
for tid in seen:
wi = _work_item_id_for(tid)
if wi:
try:
plane_sync.set_issue_blocked(wi)
except Exception:
pass
try:
notifications.send_telegram(
f"\U0001f6a8 ORCH-026: dependency DEADLOCK detected (cycle): {chain}. "
f"Tasks set to Blocked — fix the blocked-by declaration."
)
except Exception:
pass
logger.error("ORCH-026: dependency cycle detected: %s", chain)
return True
except Exception:
return False
# ---------------------------------------------------------------------------
# Declaration (B-1) — db.add_dependency + immediate cycle escalation
# ---------------------------------------------------------------------------
def declare_dependency(task_id: int, depends_on_task_id: int) -> bool:
"""Declare "task_id (B) blocked-by depends_on_task_id (A)" and check for a cycle.
Thin wrapper over ``db.add_dependency`` that, after a successful insert, runs
``detect_cycle`` from the new dependent — so a freshly-introduced deadlock is
surfaced (Blocked + alert) at declaration time (best UX, ADR B-3) rather than
only by the reconciler backstop. The edge is NOT rolled back on a cycle (the
SQL gate already keeps the cyclic tasks un-claimable; the human fixes the
declaration) — we make it VISIBLE. never-raise: any error -> False.
Returns True iff a NEW edge row was inserted (idempotent re-declaration ->
False, matching db.add_dependency).
"""
try:
inserted = db.add_dependency(task_id, depends_on_task_id)
# Always check for a cycle (even on a duplicate edge an existing cycle may
# now be relevant), but only escalate when one is actually found.
cyc = detect_cycle(task_id)
if cyc:
handle_cycle(cyc)
return inserted
except Exception:
return False
def ingest_plane_relations(
task_id: int, issue_id: str, project_id: str
) -> int:
"""B-1 (plane/hybrid source): import Plane ``blocked-by`` relations into job_deps.
Reads the issue's ``blocked_by`` predecessors from Plane, resolves each to a
local ``tasks.id`` (intra-repo only, v1) and declares the edge. A predecessor
not yet known locally (no task row) is SKIPPED — the scheduler can only gate
on tasks it knows; a re-run after that task is created will pick it up.
Active ONLY when ``task_deps_source`` is ``plane`` or ``hybrid`` (default
``db`` -> no Plane call on the hot creation path). never-raise (self-hosting):
any error -> 0 edges, the pipeline start is never blocked by Plane. Returns
the number of edges declared.
"""
source = (getattr(settings, "task_deps_source", "db") or "db").strip().lower()
if source not in ("plane", "hybrid"):
return 0
if not issue_id or not project_id:
return 0
try:
from . import plane_sync
blocked_by = plane_sync.fetch_blocked_by_issue_ids(issue_id, project_id)
except Exception:
return 0
declared = 0
for pred_issue in blocked_by:
try:
pred = db.get_task_by_plane_id(str(pred_issue))
if not pred:
continue
if declare_dependency(task_id, pred["id"]):
declared += 1
except Exception:
continue
return declared
# ---------------------------------------------------------------------------
# Observability (/queue snapshot, G-2)
# ---------------------------------------------------------------------------
def snapshot() -> dict:
"""Read-only summary of the dependency subsystem for GET /queue (G-2).
Returns a dict (NOT a source of truth — pure observability):
* ``enabled`` — task_deps_enabled flag;
* ``source`` — task_deps_source (db|plane|hybrid);
* ``edges`` — number of declared edges;
* ``blocked_tasks`` — list of ``{task_id, work_item_id, waiting_on}`` for
tasks with at least one un-done predecessor;
* ``cycle`` — a detected cycle path (work-item labels) or None.
never-raise: any error -> a minimal dict with the flags and empty data.
"""
enabled = bool(getattr(settings, "task_deps_enabled", False))
source = getattr(settings, "task_deps_source", "db")
try:
edges = db.get_dependency_edges()
blocked: list[dict] = []
for task_id in {e[0] for e in edges}:
ready, waiting_on = is_task_ready(task_id)
if not ready:
blocked.append({
"task_id": task_id,
"work_item_id": _work_item_id_for(task_id),
"waiting_on": waiting_on,
})
cyc = find_any_cycle(edges)
cycle_labels = None
if cyc:
cycle_labels = [(_work_item_id_for(t) or f"task#{t}") for t in cyc]
return {
"enabled": enabled,
"source": source,
"edges": len(edges),
"blocked_tasks": blocked,
"cycle": cycle_labels,
}
except Exception:
return {
"enabled": enabled,
"source": source,
"edges": 0,
"blocked_tasks": [],
"cycle": None,
}

View File

@@ -608,6 +608,17 @@ async def start_pipeline(data: dict, project_id: str = ""):
except Exception as e:
logger.error(f"Failed to launch analyst for {work_item_id}: {e}")
# ORCH-026 (B-1): import declared Plane `blocked-by` relations into job_deps
# (only for task_deps_source = plane|hybrid; default `db` -> no-op, no Plane
# call). Best-effort, never-raise: a Plane outage must not block the start.
try:
from .. import task_deps
n = task_deps.ingest_plane_relations(task_id, plane_id, plane_project_id)
if n:
logger.info(f"Task {task_id}: ingested {n} blocked-by dependency edge(s)")
except Exception as e:
logger.warning(f"Task {task_id}: dependency ingestion skipped: {e}")
async def handle_comment(data: dict, project_id: str = ""):
"""Status-only verdict model: comments NEVER drive the pipeline.