Files
orchestrator/tests/test_orch026_deps_integration.py
claude-bot a74379f657 feat(ORCH-026): task dependencies (B waits for A) + single-repo merge serialization
Level A — merge/deploy serialization within one repo: reuse the existing
ORCH-043/065 merge-lease (no new mechanism); the only new logic is an
unconditional pre-merge rebase in check_branch_mergeable — under the held
lease, auto_rebase_onto_main is ALWAYS called when premerge_rebase_always
(default True), not just when the branch is behind. No-op on an up-to-date
branch (rebase keeps HEAD, force-with-lease -> "Everything up-to-date", CI
not triggered). Kill-switch off -> ORCH-043 behaviour 1:1.

Level B — declarative task dependencies: additive job_deps table
(CREATE ... IF NOT EXISTS, no live-DB migration); claim_next_job gate
(NOT EXISTS) defers a job whose depends-on tasks are not yet 'done' without
occupying a max_concurrency slot; inert on empty job_deps -> zero regression.
New leaf src/task_deps.py (never-raise): is_task_ready (fail-open), DFS cycle
detection + Blocked/alert, declare/ingest_plane_relations (db source never
hits the network on the hot path), snapshot. Telegram waiting-line, /queue
observability, reconciler skip + cycle backstop, reaper untouched.

Invariants unchanged: STAGE_TRANSITIONS, QG_CHECKS registry (dep gate is a
claim_next_job врезка, not a registered QG), DB schema of existing tables,
HTTP endpoints; non-self repos remain a no-op on empty deps/scope.

Flags: ORCH_PREMERGE_REBASE_ALWAYS, ORCH_TASK_DEPS_ENABLED, ORCH_TASK_DEPS_SOURCE.
Docs: docs/architecture/README.md, CLAUDE.md, .env.example, CHANGELOG.md,
adr-0015. Tests: tests/test_orch026_*.py (64 tests); full suite 991 green.

Refs: ORCH-026

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-06-08 19:17:44 +03:00

125 lines
4.6 KiB
Python

"""ORCH-026 Level B — declarative dependencies integration (TC-B08).
End-to-end (DB level): B declared blocked-by A; queued B does not start until A
is 'done'; after A->done the worker can claim B. Also covers the plane/hybrid
ingestion path: Plane `blocked-by` relations are resolved to local task ids and
written into job_deps (the scheduler then reads only the DB).
"""
import os
import tempfile
import pytest
os.environ["ORCH_DB_PATH"] = os.path.join(tempfile.gettempdir(), "test_orch026_depsint.db")
os.environ.setdefault("ORCH_GITEA_TOKEN", "test-token")
os.environ.setdefault("ORCH_PLANE_API_TOKEN", "test-token")
import src.db as db # noqa: E402
from src.db import init_db, get_db, enqueue_job, claim_next_job # noqa: E402
from src import task_deps # noqa: E402
@pytest.fixture(autouse=True)
def fresh_db(tmp_path, monkeypatch):
dbfile = tmp_path / "depsint.db"
monkeypatch.setattr(db.settings, "db_path", str(dbfile))
monkeypatch.setattr(db.settings, "task_deps_enabled", True, raising=False)
init_db()
yield
def _make_task(work_item_id, stage="development", plane_id=None):
conn = get_db()
pid = plane_id or work_item_id
cur = conn.execute(
"INSERT INTO tasks (plane_id, work_item_id, repo, branch, stage, plane_issue_id) "
"VALUES (?, ?, ?, ?, ?, ?)",
(pid, work_item_id, "orchestrator", f"feature/{work_item_id}", stage, pid),
)
tid = cur.lastrowid
conn.commit()
conn.close()
return tid
def _set_stage(task_id, stage):
conn = get_db()
conn.execute("UPDATE tasks SET stage=? WHERE id=?", (stage, task_id))
conn.commit()
conn.close()
def test_b_waits_for_a_then_runs():
a = _make_task("ORCH-200", stage="development")
b = _make_task("ORCH-201", stage="development")
db.add_dependency(b, a)
job_b = enqueue_job("developer", "orchestrator", "do B", task_id=b)
# While A is in flight, B is not claimable.
assert claim_next_job() is None
ready, waiting = task_deps.is_task_ready(b)
assert ready is False and "ORCH-200" in waiting
# A advances through to done.
_set_stage(a, "review")
assert claim_next_job() is None # still not terminal
_set_stage(a, "done")
claimed = claim_next_job()
assert claimed is not None and claimed["id"] == job_b
def test_multiple_predecessors_all_must_be_done():
a1 = _make_task("ORCH-210", stage="development")
a2 = _make_task("ORCH-211", stage="development")
b = _make_task("ORCH-212", stage="development")
db.add_dependency(b, a1)
db.add_dependency(b, a2)
job_b = enqueue_job("developer", "orchestrator", "B", task_id=b)
_set_stage(a1, "done")
assert claim_next_job() is None, "still blocked by a2"
_set_stage(a2, "done")
claimed = claim_next_job()
assert claimed is not None and claimed["id"] == job_b
# ---- plane/hybrid ingestion path (TC-B01) ---------------------------------
def test_ingest_plane_relations_writes_db(monkeypatch):
monkeypatch.setattr(db.settings, "task_deps_source", "hybrid", raising=False)
a = _make_task("ORCH-220", stage="development", plane_id="plane-uuid-A")
b = _make_task("ORCH-221", stage="development", plane_id="plane-uuid-B")
import src.plane_sync as plane_sync
monkeypatch.setattr(plane_sync, "fetch_blocked_by_issue_ids",
lambda issue_id, project_id, **k: ["plane-uuid-A"],
raising=False)
n = task_deps.ingest_plane_relations(b, "plane-uuid-B", "proj-1")
assert n == 1
assert db.get_dependencies(b) == [a]
def test_ingest_noop_when_source_db(monkeypatch):
monkeypatch.setattr(db.settings, "task_deps_source", "db", raising=False)
b = _make_task("ORCH-230", stage="development", plane_id="plane-uuid-Z")
import src.plane_sync as plane_sync
called = {"n": 0}
monkeypatch.setattr(plane_sync, "fetch_blocked_by_issue_ids",
lambda *a, **k: called.__setitem__("n", called["n"] + 1) or [],
raising=False)
n = task_deps.ingest_plane_relations(b, "plane-uuid-Z", "proj-1")
assert n == 0
assert called["n"] == 0, "default db source must not call Plane"
def test_ingest_never_raises_on_plane_outage(monkeypatch):
monkeypatch.setattr(db.settings, "task_deps_source", "plane", raising=False)
b = _make_task("ORCH-240", stage="development", plane_id="plane-uuid-Y")
import src.plane_sync as plane_sync
def _boom(*a, **k):
raise RuntimeError("plane down")
monkeypatch.setattr(plane_sync, "fetch_blocked_by_issue_ids", _boom, raising=False)
assert task_deps.ingest_plane_relations(b, "plane-uuid-Y", "proj-1") == 0