Files
orchestrator/tests/test_orch026_task_deps.py
claude-bot a74379f657 feat(ORCH-026): task dependencies (B waits for A) + single-repo merge serialization
Level A — merge/deploy serialization within one repo: reuse the existing
ORCH-043/065 merge-lease (no new mechanism); the only new logic is an
unconditional pre-merge rebase in check_branch_mergeable — under the held
lease, auto_rebase_onto_main is ALWAYS called when premerge_rebase_always
(default True), not just when the branch is behind. No-op on an up-to-date
branch (rebase keeps HEAD, force-with-lease -> "Everything up-to-date", CI
not triggered). Kill-switch off -> ORCH-043 behaviour 1:1.

Level B — declarative task dependencies: additive job_deps table
(CREATE ... IF NOT EXISTS, no live-DB migration); claim_next_job gate
(NOT EXISTS) defers a job whose depends-on tasks are not yet 'done' without
occupying a max_concurrency slot; inert on empty job_deps -> zero regression.
New leaf src/task_deps.py (never-raise): is_task_ready (fail-open), DFS cycle
detection + Blocked/alert, declare/ingest_plane_relations (db source never
hits the network on the hot path), snapshot. Telegram waiting-line, /queue
observability, reconciler skip + cycle backstop, reaper untouched.

Invariants unchanged: STAGE_TRANSITIONS, QG_CHECKS registry (dep gate is a
claim_next_job врезка, not a registered QG), DB schema of existing tables,
HTTP endpoints; non-self repos remain a no-op on empty deps/scope.

Flags: ORCH_PREMERGE_REBASE_ALWAYS, ORCH_TASK_DEPS_ENABLED, ORCH_TASK_DEPS_SOURCE.
Docs: docs/architecture/README.md, CLAUDE.md, .env.example, CHANGELOG.md,
adr-0015. Tests: tests/test_orch026_*.py (64 tests); full suite 991 green.

Refs: ORCH-026

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-06-08 19:17:44 +03:00

158 lines
6.0 KiB
Python

"""ORCH-026 Level B — declarative task dependencies (TC-B01/B02/B05/B07).
Real SQLite (tmp db). We drive tasks + job_deps directly and assert:
TC-B01 add_dependency declares an edge; get_dependencies resolves it; a
self-edge is rejected; never-raise on a bad input.
TC-B02 is_task_ready: a task with an un-done predecessor is NOT ready; when
every predecessor reaches 'done' it becomes ready.
TC-B05 claim_next_job does NOT claim a dep-blocked job (no slot taken); once
the predecessor is 'done' the job becomes claimable.
TC-B07 reconciler skip helper: is_task_ready=False is honoured (the gate task
is left waiting).
"""
import os
import tempfile
import pytest
os.environ["ORCH_DB_PATH"] = os.path.join(tempfile.gettempdir(), "test_orch026_task_deps.db")
os.environ.setdefault("ORCH_GITEA_TOKEN", "test-token")
os.environ.setdefault("ORCH_PLANE_API_TOKEN", "test-token")
import src.db as db # noqa: E402
from src.db import init_db, get_db, enqueue_job, claim_next_job # noqa: E402
from src import task_deps # noqa: E402
@pytest.fixture(autouse=True)
def fresh_db(tmp_path, monkeypatch):
dbfile = tmp_path / "deps.db"
monkeypatch.setattr(db.settings, "db_path", str(dbfile))
monkeypatch.setattr(db.settings, "task_deps_enabled", True, raising=False)
init_db()
yield
def _make_task(stage="development", work_item_id="ORCH-1", repo="orchestrator"):
conn = get_db()
cur = conn.execute(
"INSERT INTO tasks (plane_id, work_item_id, repo, branch, stage) "
"VALUES (?, ?, ?, ?, ?)",
(work_item_id, work_item_id, repo, f"feature/{work_item_id}", stage),
)
tid = cur.lastrowid
conn.commit()
conn.close()
return tid
def _set_stage(task_id, stage):
conn = get_db()
conn.execute("UPDATE tasks SET stage=? WHERE id=?", (stage, task_id))
conn.commit()
conn.close()
# ----------------------------------------------------------------- TC-B01
def test_add_dependency_declares_and_resolves():
a = _make_task(work_item_id="ORCH-10", stage="development")
b = _make_task(work_item_id="ORCH-11", stage="development")
assert db.add_dependency(b, a) is True
assert db.get_dependencies(b) == [a]
# Idempotent: re-declaring the same edge is a no-op.
assert db.add_dependency(b, a) is False
def test_self_edge_rejected():
a = _make_task(work_item_id="ORCH-12")
assert db.add_dependency(a, a) is False
assert db.get_dependencies(a) == []
def test_add_dependency_never_raises_on_bad_input():
assert db.add_dependency(None, 1) is False
assert db.add_dependency(1, None) is False
# ----------------------------------------------------------------- TC-B02
def test_is_task_ready_blocked_then_ready():
a = _make_task(work_item_id="ORCH-20", stage="development")
b = _make_task(work_item_id="ORCH-21", stage="development")
db.add_dependency(b, a)
ready, waiting = task_deps.is_task_ready(b)
assert ready is False
assert "ORCH-20" in waiting
_set_stage(a, "done")
ready2, waiting2 = task_deps.is_task_ready(b)
assert ready2 is True
assert waiting2 == []
def test_is_task_ready_no_deps_is_ready():
a = _make_task(work_item_id="ORCH-22")
ready, waiting = task_deps.is_task_ready(a)
assert ready is True and waiting == []
# ----------------------------------------------------------------- TC-B05
def test_claim_skips_dep_blocked_job():
a = _make_task(work_item_id="ORCH-30", stage="development")
b = _make_task(work_item_id="ORCH-31", stage="development")
db.add_dependency(b, a)
job_b = enqueue_job("developer", "orchestrator", "do B", task_id=b)
# B is blocked by un-done A -> claim must NOT pick it (no slot taken).
claimed = claim_next_job()
assert claimed is None, "dep-blocked job must not be claimed"
# A finishes -> B becomes claimable.
_set_stage(a, "done")
claimed2 = claim_next_job()
assert claimed2 is not None
assert claimed2["id"] == job_b
def test_claim_prefers_unblocked_job_over_blocked():
a = _make_task(work_item_id="ORCH-40", stage="development")
b = _make_task(work_item_id="ORCH-41", stage="development")
c = _make_task(work_item_id="ORCH-42", stage="development")
db.add_dependency(b, a) # b blocked by a
job_b = enqueue_job("developer", "orchestrator", "B", task_id=b) # older id
job_c = enqueue_job("developer", "orchestrator", "C", task_id=c) # not blocked
claimed = claim_next_job()
assert claimed is not None
assert claimed["id"] == job_c, "blocked B skipped, unblocked C claimed"
assert job_b # referenced
# ----------------------------------------------------------------- TC-B07
def test_reconciler_skip_helper_honours_block(monkeypatch):
"""The reconciler reads is_task_ready; a not-ready task must be skipped."""
from src import reconciler as rec
a = _make_task(work_item_id="ORCH-50", stage="development")
b = _make_task(work_item_id="ORCH-51", stage="development")
db.add_dependency(b, a)
advanced = {"called": False}
monkeypatch.setattr(rec, "advance_if_gate_passed",
lambda *a, **k: advanced.__setitem__("called", True),
raising=False)
monkeypatch.setattr(rec, "has_active_job_for_task", lambda tid: False, raising=False)
monkeypatch.setattr(rec, "developer_retry_count", lambda tid: 0, raising=False)
monkeypatch.setattr(rec.settings, "task_deps_enabled", True, raising=False)
monkeypatch.setattr(rec.settings, "reconcile_enabled", True, raising=False)
monkeypatch.setattr(rec.settings, "reconcile_grace_default_s", 0, raising=False)
r = rec.Reconciler()
# Bypass Guard 2 (networked) so we isolate Guard 3.
monkeypatch.setattr(r, "_is_blocked_or_needs_input", lambda task: False)
task_row = {"id": b, "stage": "development", "repo": "orchestrator",
"work_item_id": "ORCH-51", "branch": "feature/ORCH-51", "age_s": 9999}
r._reconcile_gate_task(task_row)
assert advanced["called"] is False, "dep-blocked task must not be advanced (B-5)"