Fixes incident ORCH-116/ORCH-123: serial_gate defined a repo's "active task"
purely by machine stage (tasks.stage NOT IN ('done','cancelled')). Plane statuses
Backlog/Blocked/Needs-Input (layer-B indication, ORCH-066) do NOT change
tasks.stage (layer A), so a paused predecessor was indistinguishable from an active
one and held the FIFO gate closed against an urgent successor — the urgent fix
could not start until the paused task was formally done.
Introduces an explicit, durable, DB-resolvable per-task "park" signal — additive
nullable column tasks.paused_at (pattern of cancelled_at/track) — and a new
ORTHOGONAL scheduler "pause" axis. The serial-gate "active task" predicate becomes
`stage NOT IN ('done','cancelled') AND paused_at IS NULL` across all three points
(build_claim_clause / repo_has_active_task / _per_repo_snapshot). The terminal set
{done,cancelled} in serial_gate/task_deps/stages.py is byte-for-byte unchanged
(adr-0026 not regressed): task_deps/stages.py do NOT read paused_at, so a paused
declared dependency and an active repo_freeze STILL block (pause never bypasses
them — different axes). Anti-stale-base on resume relies on the existing deferred
branch cut (ORCH-088) + pre-merge auto_rebase_onto_main + merge-gate re-test
(ORCH-026/093/110) — no new rebase machinery.
Additive, under an independent sub-flag, never-raise, restart-safe; hot-claim
fail-OPEN and freeze fail-CLOSED preserved. STAGE_TRANSITIONS / QG_CHECKS / check_*
/ machine-verdict keys / existing table schemas are byte-for-byte untouched (this is
a queue-scheduler + observability change, not a Quality Gate).
- src/db.py: additive tasks.paused_at column (_ensure_column) + set/clear/is helpers
- src/serial_gate.py: _pause_layer_enabled() + pause-term in the 3 points; `paused`
list + per-job `reason` (freeze>dependency>active-task>null) in the /queue snapshot
- src/config.py + .env.example: serial_gate_pause_enabled (default True = true no-op)
- src/main.py: POST /serial-gate/pause|resume?work_item=<id> (by образцу unfreeze)
- tests/test_orch124_serial_gate_pause.py: TC-01 mandatory incident regress + TC-02..15
- CHANGELOG.md: [Unreleased] entry
ADR: docs/work-items/ORCH-124/06-adr/ADR-001-serial-gate-pause-without-blocking.md
Cross-cutting: docs/architecture/adr/adr-0051-serial-gate-pause-without-blocking.md
Refs: ORCH-124
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
354 lines
17 KiB
Python
354 lines
17 KiB
Python
"""ORCH-124 — serial-gate wait/pause semantics (real tmp SQLite, no network).
|
|
|
|
A paused predecessor must NOT block an urgent successor's analyst-job, while a
|
|
normally-executing predecessor still holds the FIFO gate (anti-stale-base ORCH-088
|
|
preserved). Covers 04-test-plan.yaml TC-01…TC-15. The behaviour (not the exact SQL)
|
|
is asserted: pause is an explicit, durable, DB-resolvable per-task signal
|
|
(``tasks.paused_at``) that the offline hot-claim SQL reads locally.
|
|
|
|
TC-01 REGRESS (mandatory): earlier PAUSED task A + later urgent B -> claim picks
|
|
B's analyst-job (gate open). Reproduces incident ORCH-116/ORCH-123.
|
|
TC-02 Predecessor parked (Backlog intent) -> build_claim_clause does NOT block B.
|
|
TC-03 Predecessor parked at another wait-stage (Needs-Input intent) -> still open.
|
|
TC-04 ANTI-REGRESS ORCH-088: a NON-paused unfinished predecessor STILL blocks B.
|
|
TC-05 Pause needs explicit durable intent; unpaused non-terminal task stays active.
|
|
TC-06 Durable: the pause signal survives a connection/restart (read from the DB).
|
|
TC-07 Resume restores participation in the gate (no eternal bypass).
|
|
TC-08 Explicit blocks kept: an active repo_freeze still gates B (pause != bypass).
|
|
TC-09 Explicit blocks kept: an unfinished declared dependency still gates B.
|
|
TC-10 /queue snapshot: paused task not shown as active_task; reason is correct.
|
|
TC-11 Three points agree on "active" (anti-drift): clause / mirror / snapshot.
|
|
TC-12 Hot-path offline: claim resolves pause with no network (Plane not consulted).
|
|
TC-13 never-raise / fail-directions: pause error -> build_claim_clause fail-OPEN.
|
|
TC-14 Kill-switch: pause sub-flag off -> byte-for-byte ORCH-088/090 (paused blocks).
|
|
TC-15 Structural anti-regress: STAGE_TRANSITIONS / QG_CHECKS / table schemas intact.
|
|
"""
|
|
import os
|
|
import tempfile
|
|
|
|
import pytest
|
|
|
|
os.environ["ORCH_DB_PATH"] = os.path.join(tempfile.gettempdir(), "test_orch124_pause.db")
|
|
os.environ.setdefault("ORCH_GITEA_TOKEN", "test-token")
|
|
os.environ.setdefault("ORCH_PLANE_API_TOKEN", "test-token")
|
|
|
|
import src.db as db # noqa: E402
|
|
from src.db import init_db, get_db, enqueue_job, claim_next_job # noqa: E402
|
|
from src import serial_gate # noqa: E402
|
|
from src import config as cfg # noqa: E402
|
|
|
|
|
|
@pytest.fixture(autouse=True)
|
|
def fresh_db(tmp_path, monkeypatch):
|
|
dbfile = tmp_path / "pause.db"
|
|
monkeypatch.setattr(db.settings, "db_path", str(dbfile))
|
|
# Serial gate ON; freeze layer ON; pause layer ON; empty CSV (all repos).
|
|
monkeypatch.setattr(cfg.settings, "serial_gate_enabled", True, raising=False)
|
|
monkeypatch.setattr(cfg.settings, "serial_gate_repos", "", raising=False)
|
|
monkeypatch.setattr(cfg.settings, "serial_gate_freeze_enabled", True, raising=False)
|
|
monkeypatch.setattr(cfg.settings, "serial_gate_pause_enabled", True, raising=False)
|
|
# Keep the unrelated dep-gate inert unless a test opts in.
|
|
monkeypatch.setattr(cfg.settings, "task_deps_enabled", False, raising=False)
|
|
init_db()
|
|
yield
|
|
|
|
|
|
def _make_task(work_item_id, stage="analysis", repo="orchestrator"):
|
|
conn = get_db()
|
|
cur = conn.execute(
|
|
"INSERT INTO tasks (plane_id, work_item_id, repo, branch, stage, title) "
|
|
"VALUES (?, ?, ?, ?, ?, ?)",
|
|
(work_item_id, work_item_id, repo, f"feature/{work_item_id}", stage, work_item_id),
|
|
)
|
|
tid = cur.lastrowid
|
|
conn.commit()
|
|
conn.close()
|
|
return tid
|
|
|
|
|
|
# --------------------------------------------------------------- TC-01
|
|
def test_paused_predecessor_does_not_block_urgent_successor():
|
|
"""REGRESS (ORCH-116/ORCH-123): earlier PAUSED A must not gate urgent B."""
|
|
a = _make_task("ORCH-116", stage="development") # earlier predecessor
|
|
b = _make_task("ORCH-123", stage="analysis") # later urgent task
|
|
job_b = enqueue_job("analyst", "orchestrator", "B", task_id=b)
|
|
|
|
# Before the pause A holds the FIFO gate -> B is blocked (the incident state).
|
|
assert claim_next_job() is None, "active A gates B (pre-pause, FIFO ORCH-088)"
|
|
|
|
# Operator parks A. Now B's analyst-job must become claimable.
|
|
assert db.set_task_paused(a) is True
|
|
claimed = claim_next_job()
|
|
assert claimed is not None and claimed["id"] == job_b, (
|
|
"a PAUSED predecessor must not gate the urgent successor (AC-1)"
|
|
)
|
|
|
|
|
|
# --------------------------------------------------------------- TC-02
|
|
def test_parked_backlog_predecessor_not_active_in_clause():
|
|
a = _make_task("ORCH-201", stage="analysis") # "Backlog" intent
|
|
b = _make_task("ORCH-202", stage="analysis")
|
|
job_b = enqueue_job("analyst", "orchestrator", "B", task_id=b)
|
|
db.set_task_paused(a)
|
|
assert "paused_at IS NULL" in serial_gate.build_claim_clause()
|
|
claimed = claim_next_job()
|
|
assert claimed is not None and claimed["id"] == job_b
|
|
|
|
|
|
# --------------------------------------------------------------- TC-03
|
|
def test_parked_needs_input_predecessor_not_active():
|
|
# Another wait-stage (review ~ "Needs-Input" intent) — same park column.
|
|
a = _make_task("ORCH-203", stage="review")
|
|
b = _make_task("ORCH-204", stage="analysis")
|
|
job_b = enqueue_job("analyst", "orchestrator", "B", task_id=b)
|
|
db.set_task_paused(a)
|
|
claimed = claim_next_job()
|
|
assert claimed is not None and claimed["id"] == job_b
|
|
|
|
|
|
# --------------------------------------------------------------- TC-04
|
|
def test_non_paused_predecessor_still_blocks_fifo():
|
|
"""ANTI-REGRESS ORCH-088: a normally-executing A still gates B."""
|
|
_make_task("ORCH-210", stage="development") # NOT paused
|
|
b = _make_task("ORCH-211", stage="analysis")
|
|
enqueue_job("analyst", "orchestrator", "B", task_id=b)
|
|
assert claim_next_job() is None, (
|
|
"a non-paused unfinished predecessor must STILL hold the gate (FIFO intact)"
|
|
)
|
|
|
|
|
|
# --------------------------------------------------------------- TC-05
|
|
def test_pause_requires_explicit_durable_intent():
|
|
a = _make_task("ORCH-215", stage="development")
|
|
b = _make_task("ORCH-216", stage="analysis")
|
|
enqueue_job("analyst", "orchestrator", "B", task_id=b)
|
|
# No explicit pause -> A is active -> gate held (no heuristic auto-pause).
|
|
assert db.is_task_paused(a) is False
|
|
assert claim_next_job() is None
|
|
# The pause signal is DB-resolvable once set explicitly.
|
|
db.set_task_paused(a)
|
|
assert db.is_task_paused(a) is True
|
|
|
|
|
|
# --------------------------------------------------------------- TC-06
|
|
def test_pause_signal_is_durable_across_restart():
|
|
a = _make_task("ORCH-220", stage="development")
|
|
b = _make_task("ORCH-221", stage="analysis")
|
|
job_b = enqueue_job("analyst", "orchestrator", "B", task_id=b)
|
|
db.set_task_paused(a)
|
|
# Simulate a restart: drop in-memory state, re-run the idempotent migration.
|
|
init_db()
|
|
assert db.is_task_paused(a) is True, "pause must survive restart (read from DB)"
|
|
claimed = claim_next_job()
|
|
assert claimed is not None and claimed["id"] == job_b
|
|
|
|
|
|
# --------------------------------------------------------------- TC-07
|
|
def test_resume_restores_gate_participation():
|
|
a = _make_task("ORCH-225", stage="development")
|
|
b = _make_task("ORCH-226", stage="analysis")
|
|
enqueue_job("analyst", "orchestrator", "B", task_id=b)
|
|
db.set_task_paused(a)
|
|
assert claim_next_job() is not None # B claimable while A paused
|
|
# Re-queue a fresh analyst-job for B (the previous one was claimed) and resume A.
|
|
conn = get_db()
|
|
conn.execute("UPDATE jobs SET status='queued', started_at=NULL WHERE task_id=?", (b,))
|
|
conn.commit()
|
|
conn.close()
|
|
assert db.clear_task_paused(a) is True
|
|
assert db.is_task_paused(a) is False
|
|
assert claim_next_job() is None, (
|
|
"after resume A holds the gate again — no eternal bypass (AC-10)"
|
|
)
|
|
|
|
|
|
# --------------------------------------------------------------- TC-08
|
|
def test_pause_does_not_bypass_freeze():
|
|
_make_task("ORCH-230", stage="done") # nothing unfinished
|
|
a = _make_task("ORCH-231", stage="development")
|
|
b = _make_task("ORCH-232", stage="analysis")
|
|
enqueue_job("analyst", "orchestrator", "B", task_id=b)
|
|
db.set_task_paused(a)
|
|
# Freeze the repo: even with A paused, B must stay blocked by the freeze.
|
|
serial_gate.set_repo_freeze("orchestrator", "DEGRADED", "ORCH-230")
|
|
assert claim_next_job() is None, "an active freeze gates B; pause must not bypass it"
|
|
# Clearing the freeze (A still paused) -> B becomes claimable.
|
|
serial_gate.clear_repo_freeze("orchestrator")
|
|
assert claim_next_job() is not None
|
|
|
|
|
|
# --------------------------------------------------------------- TC-09
|
|
def test_pause_does_not_bypass_declared_dependency(monkeypatch):
|
|
monkeypatch.setattr(cfg.settings, "task_deps_enabled", True, raising=False)
|
|
a = _make_task("ORCH-240", stage="development")
|
|
b = _make_task("ORCH-241", stage="analysis")
|
|
enqueue_job("analyst", "orchestrator", "B", task_id=b)
|
|
assert db.add_dependency(b, a) is True # B blocked-by A
|
|
db.set_task_paused(a)
|
|
# task_deps reads the {done,cancelled} terminal only (NOT paused_at): an
|
|
# unfinished declared dependency keeps B blocked even though A is paused.
|
|
assert claim_next_job() is None, (
|
|
"a declared unfinished dependency gates B; pause must not bypass it (AC-5)"
|
|
)
|
|
# Once A is terminal the dependency is satisfied -> B is claimable.
|
|
conn = get_db()
|
|
conn.execute("UPDATE tasks SET stage='done' WHERE id=?", (a,))
|
|
conn.commit()
|
|
conn.close()
|
|
assert claim_next_job() is not None
|
|
|
|
|
|
# --------------------------------------------------------------- TC-10
|
|
def test_snapshot_reason_and_paused_list():
|
|
a = _make_task("ORCH-250", stage="development")
|
|
b = _make_task("ORCH-251", stage="analysis")
|
|
job_b = enqueue_job("analyst", "orchestrator", "B", task_id=b)
|
|
|
|
# (a) A active (not paused) -> B waits with reason 'active-task'; A is active_task.
|
|
per = serial_gate.snapshot()["per_repo"]["orchestrator"]
|
|
assert per["active_task"]["work_item_id"] == "ORCH-250"
|
|
assert per["paused"] == []
|
|
wb = next(w for w in per["waiting"] if w["job_id"] == job_b)
|
|
assert wb["reason"] == "active-task"
|
|
# Existing keys preserved (BC).
|
|
assert set(per) >= {"active_task", "waiting", "frozen", "frozen_reason", "frozen_at"}
|
|
|
|
# (b) Pause A -> A no longer active_task; it appears in `paused`; B is claimable
|
|
# (reason None — a paused predecessor is by design NOT a wait reason).
|
|
db.set_task_paused(a)
|
|
per = serial_gate.snapshot()["per_repo"]["orchestrator"]
|
|
assert per["active_task"] is None or per["active_task"]["work_item_id"] != "ORCH-250"
|
|
assert any(p["work_item_id"] == "ORCH-250" for p in per["paused"])
|
|
wb = next(w for w in per["waiting"] if w["job_id"] == job_b)
|
|
assert wb["reason"] is None
|
|
|
|
# (c) Freeze -> reason 'freeze' (highest priority).
|
|
serial_gate.set_repo_freeze("orchestrator", "DEGRADED", "ORCH-250")
|
|
per = serial_gate.snapshot()["per_repo"]["orchestrator"]
|
|
wb = next(w for w in per["waiting"] if w["job_id"] == job_b)
|
|
assert wb["reason"] == "freeze"
|
|
|
|
|
|
# --------------------------------------------------------------- TC-11
|
|
def test_three_points_agree_on_active():
|
|
"""Anti-drift: clause / mirror / snapshot classify predecessor A identically.
|
|
|
|
B is excluded from the mirror (``exclude_task_id=b``) to mirror the clause's
|
|
own-row exclusion (``t2.id < jobs.task_id``), so the three points are asked the
|
|
SAME question: "does the non-B predecessor A count as an active blocker?".
|
|
"""
|
|
a = _make_task("ORCH-260", stage="development")
|
|
b = _make_task("ORCH-261", stage="analysis")
|
|
enqueue_job("analyst", "orchestrator", "B", task_id=b)
|
|
|
|
# A NOT paused -> all three say A is active.
|
|
assert serial_gate.repo_has_active_task("orchestrator", exclude_task_id=b) is True
|
|
assert (serial_gate.snapshot()["per_repo"]["orchestrator"]["active_task"]
|
|
["work_item_id"] == "ORCH-260")
|
|
assert claim_next_job() is None # clause blocks B on A
|
|
|
|
# A paused -> all three agree A is NOT active (consistent, no drift).
|
|
db.set_task_paused(a)
|
|
assert serial_gate.repo_has_active_task("orchestrator", exclude_task_id=b) is False
|
|
snap = serial_gate.snapshot()["per_repo"]["orchestrator"]
|
|
active = snap["active_task"]
|
|
assert active is None or active["work_item_id"] != "ORCH-260"
|
|
assert any(p["work_item_id"] == "ORCH-260" for p in snap["paused"])
|
|
assert claim_next_job() is not None # clause now opens for B
|
|
|
|
|
|
# --------------------------------------------------------------- TC-12
|
|
def test_hot_path_is_offline():
|
|
"""The pause predicate resolves from the local DB only — no network."""
|
|
a = _make_task("ORCH-270", stage="development")
|
|
b = _make_task("ORCH-271", stage="analysis")
|
|
job_b = enqueue_job("analyst", "orchestrator", "B", task_id=b)
|
|
db.set_task_paused(a)
|
|
# Functional: claim works with no Plane configured/reachable.
|
|
claimed = claim_next_job()
|
|
assert claimed is not None and claimed["id"] == job_b
|
|
# Structural: the gate leaf imports no network client (offline hot path, NFR-2).
|
|
import inspect
|
|
src = inspect.getsource(serial_gate)
|
|
for forbidden in ("import httpx", "import requests", "plane_sync", "urllib.request"):
|
|
assert forbidden not in src, f"serial_gate must stay offline (found {forbidden!r})"
|
|
|
|
|
|
# --------------------------------------------------------------- TC-13
|
|
def test_pause_error_fails_open_and_never_raises(monkeypatch):
|
|
_make_task("ORCH-280", stage="development") # would close the gate
|
|
b = _make_task("ORCH-281", stage="analysis")
|
|
job_b = enqueue_job("analyst", "orchestrator", "B", task_id=b)
|
|
|
|
def _boom():
|
|
raise RuntimeError("pause layer probe down")
|
|
|
|
monkeypatch.setattr(serial_gate, "_pause_layer_enabled", _boom, raising=True)
|
|
# build_claim_clause must fail-OPEN ('' fragment) — never raise, never wedge.
|
|
assert serial_gate.build_claim_clause() == ""
|
|
claimed = claim_next_job()
|
|
assert claimed is not None and claimed["id"] == job_b, (
|
|
"a pause-layer error must fail-OPEN, not wedge the queue (AC-9)"
|
|
)
|
|
# The other public functions degrade conservatively without raising.
|
|
assert serial_gate.repo_has_active_task("orchestrator") in (True, False)
|
|
assert isinstance(serial_gate.snapshot(), dict)
|
|
# Freeze direction is NOT inverted by a pause error (still fail-CLOSED on doubt).
|
|
monkeypatch.setattr(
|
|
serial_gate, "_active_freeze_row",
|
|
lambda repo: (_ for _ in ()).throw(RuntimeError("freeze read down")),
|
|
raising=True,
|
|
)
|
|
assert serial_gate.is_repo_frozen("orchestrator") is True
|
|
# The DB mutators/readers never raise on bad input either.
|
|
assert db.set_task_paused(None) is False
|
|
assert db.clear_task_paused(None) is False
|
|
assert db.is_task_paused(None) is False
|
|
|
|
|
|
# --------------------------------------------------------------- TC-14
|
|
def test_kill_switch_off_is_byte_for_byte_orch088(monkeypatch):
|
|
monkeypatch.setattr(cfg.settings, "serial_gate_pause_enabled", False, raising=False)
|
|
a = _make_task("ORCH-290", stage="development")
|
|
b = _make_task("ORCH-291", stage="analysis")
|
|
enqueue_job("analyst", "orchestrator", "B", task_id=b)
|
|
db.set_task_paused(a)
|
|
# Pause sub-flag OFF -> the pause-term is omitted -> a paused task STILL counts
|
|
# as active (deliberate ORCH-088/090 rollback behaviour).
|
|
assert "paused_at" not in serial_gate.build_claim_clause()
|
|
assert claim_next_job() is None, (
|
|
"with the pause sub-flag off serial-gate is byte-for-byte ORCH-088/090"
|
|
)
|
|
# Outside the (empty) repo scope nothing changes for enduro either.
|
|
et = _make_task("ET-290", stage="analysis", repo="enduro-trails")
|
|
job_et = enqueue_job("analyst", "enduro-trails", "B", task_id=et)
|
|
claimed = claim_next_job()
|
|
assert claimed is not None and claimed["id"] == job_et
|
|
|
|
|
|
# --------------------------------------------------------------- TC-15
|
|
def test_registries_and_schemas_unchanged():
|
|
from src.stages import STAGE_TRANSITIONS
|
|
from src.qg.checks import QG_CHECKS
|
|
# ORCH-124 is a scheduler-only change: no new edge, no new terminal sink.
|
|
assert set(STAGE_TRANSITIONS) == {
|
|
"created", "analysis", "architecture", "development", "review",
|
|
"testing", "deploy-staging", "deploy", "done", "cancelled",
|
|
}
|
|
# No serial-gate / pause QG check was introduced (the gate is a scheduler cond).
|
|
assert not any("serial" in k or "pause" in k for k in QG_CHECKS)
|
|
# Existing table schemas intact; tasks gained the additive paused_at column.
|
|
conn = get_db()
|
|
try:
|
|
task_cols = {r[1] for r in conn.execute("PRAGMA table_info(tasks)").fetchall()}
|
|
job_cols = {r[1] for r in conn.execute("PRAGMA table_info(jobs)").fetchall()}
|
|
dep_cols = {r[1] for r in conn.execute("PRAGMA table_info(job_deps)").fetchall()}
|
|
frz_cols = {r[1] for r in conn.execute("PRAGMA table_info(repo_freeze)").fetchall()}
|
|
finally:
|
|
conn.close()
|
|
assert "paused_at" in task_cols # additive
|
|
assert {"id", "repo", "stage", "work_item_id"}.issubset(task_cols)
|
|
assert {"id", "agent", "repo", "status", "task_id"}.issubset(job_cols)
|
|
assert {"task_id", "depends_on_task_id"}.issubset(dep_cols)
|
|
assert {"repo", "frozen_at", "cleared_at"}.issubset(frz_cols)
|