Files
orchestrator/tests/test_orch124_serial_gate_pause.py
claude-bot 87af857082
All checks were successful
CI / test (push) Successful in 1m12s
CI / test (pull_request) Successful in 1m17s
fix(serial-gate): pause-without-blocking via per-task park signal (ORCH-124)
Fixes incident ORCH-116/ORCH-123: serial_gate defined a repo's "active task"
purely by machine stage (tasks.stage NOT IN ('done','cancelled')). Plane statuses
Backlog/Blocked/Needs-Input (layer-B indication, ORCH-066) do NOT change
tasks.stage (layer A), so a paused predecessor was indistinguishable from an active
one and held the FIFO gate closed against an urgent successor — the urgent fix
could not start until the paused task was formally done.

Introduces an explicit, durable, DB-resolvable per-task "park" signal — additive
nullable column tasks.paused_at (pattern of cancelled_at/track) — and a new
ORTHOGONAL scheduler "pause" axis. The serial-gate "active task" predicate becomes
`stage NOT IN ('done','cancelled') AND paused_at IS NULL` across all three points
(build_claim_clause / repo_has_active_task / _per_repo_snapshot). The terminal set
{done,cancelled} in serial_gate/task_deps/stages.py is byte-for-byte unchanged
(adr-0026 not regressed): task_deps/stages.py do NOT read paused_at, so a paused
declared dependency and an active repo_freeze STILL block (pause never bypasses
them — different axes). Anti-stale-base on resume relies on the existing deferred
branch cut (ORCH-088) + pre-merge auto_rebase_onto_main + merge-gate re-test
(ORCH-026/093/110) — no new rebase machinery.

Additive, under an independent sub-flag, never-raise, restart-safe; hot-claim
fail-OPEN and freeze fail-CLOSED preserved. STAGE_TRANSITIONS / QG_CHECKS / check_*
/ machine-verdict keys / existing table schemas are byte-for-byte untouched (this is
a queue-scheduler + observability change, not a Quality Gate).

- src/db.py: additive tasks.paused_at column (_ensure_column) + set/clear/is helpers
- src/serial_gate.py: _pause_layer_enabled() + pause-term in the 3 points; `paused`
  list + per-job `reason` (freeze>dependency>active-task>null) in the /queue snapshot
- src/config.py + .env.example: serial_gate_pause_enabled (default True = true no-op)
- src/main.py: POST /serial-gate/pause|resume?work_item=<id> (by образцу unfreeze)
- tests/test_orch124_serial_gate_pause.py: TC-01 mandatory incident regress + TC-02..15
- CHANGELOG.md: [Unreleased] entry

ADR: docs/work-items/ORCH-124/06-adr/ADR-001-serial-gate-pause-without-blocking.md
Cross-cutting: docs/architecture/adr/adr-0051-serial-gate-pause-without-blocking.md

Refs: ORCH-124

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-16 19:35:55 +03:00

354 lines
17 KiB
Python

"""ORCH-124 — serial-gate wait/pause semantics (real tmp SQLite, no network).
A paused predecessor must NOT block an urgent successor's analyst-job, while a
normally-executing predecessor still holds the FIFO gate (anti-stale-base ORCH-088
preserved). Covers 04-test-plan.yaml TC-01…TC-15. The behaviour (not the exact SQL)
is asserted: pause is an explicit, durable, DB-resolvable per-task signal
(``tasks.paused_at``) that the offline hot-claim SQL reads locally.
TC-01 REGRESS (mandatory): earlier PAUSED task A + later urgent B -> claim picks
B's analyst-job (gate open). Reproduces incident ORCH-116/ORCH-123.
TC-02 Predecessor parked (Backlog intent) -> build_claim_clause does NOT block B.
TC-03 Predecessor parked at another wait-stage (Needs-Input intent) -> still open.
TC-04 ANTI-REGRESS ORCH-088: a NON-paused unfinished predecessor STILL blocks B.
TC-05 Pause needs explicit durable intent; unpaused non-terminal task stays active.
TC-06 Durable: the pause signal survives a connection/restart (read from the DB).
TC-07 Resume restores participation in the gate (no eternal bypass).
TC-08 Explicit blocks kept: an active repo_freeze still gates B (pause != bypass).
TC-09 Explicit blocks kept: an unfinished declared dependency still gates B.
TC-10 /queue snapshot: paused task not shown as active_task; reason is correct.
TC-11 Three points agree on "active" (anti-drift): clause / mirror / snapshot.
TC-12 Hot-path offline: claim resolves pause with no network (Plane not consulted).
TC-13 never-raise / fail-directions: pause error -> build_claim_clause fail-OPEN.
TC-14 Kill-switch: pause sub-flag off -> byte-for-byte ORCH-088/090 (paused blocks).
TC-15 Structural anti-regress: STAGE_TRANSITIONS / QG_CHECKS / table schemas intact.
"""
import os
import tempfile
import pytest
os.environ["ORCH_DB_PATH"] = os.path.join(tempfile.gettempdir(), "test_orch124_pause.db")
os.environ.setdefault("ORCH_GITEA_TOKEN", "test-token")
os.environ.setdefault("ORCH_PLANE_API_TOKEN", "test-token")
import src.db as db # noqa: E402
from src.db import init_db, get_db, enqueue_job, claim_next_job # noqa: E402
from src import serial_gate # noqa: E402
from src import config as cfg # noqa: E402
@pytest.fixture(autouse=True)
def fresh_db(tmp_path, monkeypatch):
dbfile = tmp_path / "pause.db"
monkeypatch.setattr(db.settings, "db_path", str(dbfile))
# Serial gate ON; freeze layer ON; pause layer ON; empty CSV (all repos).
monkeypatch.setattr(cfg.settings, "serial_gate_enabled", True, raising=False)
monkeypatch.setattr(cfg.settings, "serial_gate_repos", "", raising=False)
monkeypatch.setattr(cfg.settings, "serial_gate_freeze_enabled", True, raising=False)
monkeypatch.setattr(cfg.settings, "serial_gate_pause_enabled", True, raising=False)
# Keep the unrelated dep-gate inert unless a test opts in.
monkeypatch.setattr(cfg.settings, "task_deps_enabled", False, raising=False)
init_db()
yield
def _make_task(work_item_id, stage="analysis", repo="orchestrator"):
conn = get_db()
cur = conn.execute(
"INSERT INTO tasks (plane_id, work_item_id, repo, branch, stage, title) "
"VALUES (?, ?, ?, ?, ?, ?)",
(work_item_id, work_item_id, repo, f"feature/{work_item_id}", stage, work_item_id),
)
tid = cur.lastrowid
conn.commit()
conn.close()
return tid
# --------------------------------------------------------------- TC-01
def test_paused_predecessor_does_not_block_urgent_successor():
"""REGRESS (ORCH-116/ORCH-123): earlier PAUSED A must not gate urgent B."""
a = _make_task("ORCH-116", stage="development") # earlier predecessor
b = _make_task("ORCH-123", stage="analysis") # later urgent task
job_b = enqueue_job("analyst", "orchestrator", "B", task_id=b)
# Before the pause A holds the FIFO gate -> B is blocked (the incident state).
assert claim_next_job() is None, "active A gates B (pre-pause, FIFO ORCH-088)"
# Operator parks A. Now B's analyst-job must become claimable.
assert db.set_task_paused(a) is True
claimed = claim_next_job()
assert claimed is not None and claimed["id"] == job_b, (
"a PAUSED predecessor must not gate the urgent successor (AC-1)"
)
# --------------------------------------------------------------- TC-02
def test_parked_backlog_predecessor_not_active_in_clause():
a = _make_task("ORCH-201", stage="analysis") # "Backlog" intent
b = _make_task("ORCH-202", stage="analysis")
job_b = enqueue_job("analyst", "orchestrator", "B", task_id=b)
db.set_task_paused(a)
assert "paused_at IS NULL" in serial_gate.build_claim_clause()
claimed = claim_next_job()
assert claimed is not None and claimed["id"] == job_b
# --------------------------------------------------------------- TC-03
def test_parked_needs_input_predecessor_not_active():
# Another wait-stage (review ~ "Needs-Input" intent) — same park column.
a = _make_task("ORCH-203", stage="review")
b = _make_task("ORCH-204", stage="analysis")
job_b = enqueue_job("analyst", "orchestrator", "B", task_id=b)
db.set_task_paused(a)
claimed = claim_next_job()
assert claimed is not None and claimed["id"] == job_b
# --------------------------------------------------------------- TC-04
def test_non_paused_predecessor_still_blocks_fifo():
"""ANTI-REGRESS ORCH-088: a normally-executing A still gates B."""
_make_task("ORCH-210", stage="development") # NOT paused
b = _make_task("ORCH-211", stage="analysis")
enqueue_job("analyst", "orchestrator", "B", task_id=b)
assert claim_next_job() is None, (
"a non-paused unfinished predecessor must STILL hold the gate (FIFO intact)"
)
# --------------------------------------------------------------- TC-05
def test_pause_requires_explicit_durable_intent():
a = _make_task("ORCH-215", stage="development")
b = _make_task("ORCH-216", stage="analysis")
enqueue_job("analyst", "orchestrator", "B", task_id=b)
# No explicit pause -> A is active -> gate held (no heuristic auto-pause).
assert db.is_task_paused(a) is False
assert claim_next_job() is None
# The pause signal is DB-resolvable once set explicitly.
db.set_task_paused(a)
assert db.is_task_paused(a) is True
# --------------------------------------------------------------- TC-06
def test_pause_signal_is_durable_across_restart():
a = _make_task("ORCH-220", stage="development")
b = _make_task("ORCH-221", stage="analysis")
job_b = enqueue_job("analyst", "orchestrator", "B", task_id=b)
db.set_task_paused(a)
# Simulate a restart: drop in-memory state, re-run the idempotent migration.
init_db()
assert db.is_task_paused(a) is True, "pause must survive restart (read from DB)"
claimed = claim_next_job()
assert claimed is not None and claimed["id"] == job_b
# --------------------------------------------------------------- TC-07
def test_resume_restores_gate_participation():
a = _make_task("ORCH-225", stage="development")
b = _make_task("ORCH-226", stage="analysis")
enqueue_job("analyst", "orchestrator", "B", task_id=b)
db.set_task_paused(a)
assert claim_next_job() is not None # B claimable while A paused
# Re-queue a fresh analyst-job for B (the previous one was claimed) and resume A.
conn = get_db()
conn.execute("UPDATE jobs SET status='queued', started_at=NULL WHERE task_id=?", (b,))
conn.commit()
conn.close()
assert db.clear_task_paused(a) is True
assert db.is_task_paused(a) is False
assert claim_next_job() is None, (
"after resume A holds the gate again — no eternal bypass (AC-10)"
)
# --------------------------------------------------------------- TC-08
def test_pause_does_not_bypass_freeze():
_make_task("ORCH-230", stage="done") # nothing unfinished
a = _make_task("ORCH-231", stage="development")
b = _make_task("ORCH-232", stage="analysis")
enqueue_job("analyst", "orchestrator", "B", task_id=b)
db.set_task_paused(a)
# Freeze the repo: even with A paused, B must stay blocked by the freeze.
serial_gate.set_repo_freeze("orchestrator", "DEGRADED", "ORCH-230")
assert claim_next_job() is None, "an active freeze gates B; pause must not bypass it"
# Clearing the freeze (A still paused) -> B becomes claimable.
serial_gate.clear_repo_freeze("orchestrator")
assert claim_next_job() is not None
# --------------------------------------------------------------- TC-09
def test_pause_does_not_bypass_declared_dependency(monkeypatch):
monkeypatch.setattr(cfg.settings, "task_deps_enabled", True, raising=False)
a = _make_task("ORCH-240", stage="development")
b = _make_task("ORCH-241", stage="analysis")
enqueue_job("analyst", "orchestrator", "B", task_id=b)
assert db.add_dependency(b, a) is True # B blocked-by A
db.set_task_paused(a)
# task_deps reads the {done,cancelled} terminal only (NOT paused_at): an
# unfinished declared dependency keeps B blocked even though A is paused.
assert claim_next_job() is None, (
"a declared unfinished dependency gates B; pause must not bypass it (AC-5)"
)
# Once A is terminal the dependency is satisfied -> B is claimable.
conn = get_db()
conn.execute("UPDATE tasks SET stage='done' WHERE id=?", (a,))
conn.commit()
conn.close()
assert claim_next_job() is not None
# --------------------------------------------------------------- TC-10
def test_snapshot_reason_and_paused_list():
a = _make_task("ORCH-250", stage="development")
b = _make_task("ORCH-251", stage="analysis")
job_b = enqueue_job("analyst", "orchestrator", "B", task_id=b)
# (a) A active (not paused) -> B waits with reason 'active-task'; A is active_task.
per = serial_gate.snapshot()["per_repo"]["orchestrator"]
assert per["active_task"]["work_item_id"] == "ORCH-250"
assert per["paused"] == []
wb = next(w for w in per["waiting"] if w["job_id"] == job_b)
assert wb["reason"] == "active-task"
# Existing keys preserved (BC).
assert set(per) >= {"active_task", "waiting", "frozen", "frozen_reason", "frozen_at"}
# (b) Pause A -> A no longer active_task; it appears in `paused`; B is claimable
# (reason None — a paused predecessor is by design NOT a wait reason).
db.set_task_paused(a)
per = serial_gate.snapshot()["per_repo"]["orchestrator"]
assert per["active_task"] is None or per["active_task"]["work_item_id"] != "ORCH-250"
assert any(p["work_item_id"] == "ORCH-250" for p in per["paused"])
wb = next(w for w in per["waiting"] if w["job_id"] == job_b)
assert wb["reason"] is None
# (c) Freeze -> reason 'freeze' (highest priority).
serial_gate.set_repo_freeze("orchestrator", "DEGRADED", "ORCH-250")
per = serial_gate.snapshot()["per_repo"]["orchestrator"]
wb = next(w for w in per["waiting"] if w["job_id"] == job_b)
assert wb["reason"] == "freeze"
# --------------------------------------------------------------- TC-11
def test_three_points_agree_on_active():
"""Anti-drift: clause / mirror / snapshot classify predecessor A identically.
B is excluded from the mirror (``exclude_task_id=b``) to mirror the clause's
own-row exclusion (``t2.id < jobs.task_id``), so the three points are asked the
SAME question: "does the non-B predecessor A count as an active blocker?".
"""
a = _make_task("ORCH-260", stage="development")
b = _make_task("ORCH-261", stage="analysis")
enqueue_job("analyst", "orchestrator", "B", task_id=b)
# A NOT paused -> all three say A is active.
assert serial_gate.repo_has_active_task("orchestrator", exclude_task_id=b) is True
assert (serial_gate.snapshot()["per_repo"]["orchestrator"]["active_task"]
["work_item_id"] == "ORCH-260")
assert claim_next_job() is None # clause blocks B on A
# A paused -> all three agree A is NOT active (consistent, no drift).
db.set_task_paused(a)
assert serial_gate.repo_has_active_task("orchestrator", exclude_task_id=b) is False
snap = serial_gate.snapshot()["per_repo"]["orchestrator"]
active = snap["active_task"]
assert active is None or active["work_item_id"] != "ORCH-260"
assert any(p["work_item_id"] == "ORCH-260" for p in snap["paused"])
assert claim_next_job() is not None # clause now opens for B
# --------------------------------------------------------------- TC-12
def test_hot_path_is_offline():
"""The pause predicate resolves from the local DB only — no network."""
a = _make_task("ORCH-270", stage="development")
b = _make_task("ORCH-271", stage="analysis")
job_b = enqueue_job("analyst", "orchestrator", "B", task_id=b)
db.set_task_paused(a)
# Functional: claim works with no Plane configured/reachable.
claimed = claim_next_job()
assert claimed is not None and claimed["id"] == job_b
# Structural: the gate leaf imports no network client (offline hot path, NFR-2).
import inspect
src = inspect.getsource(serial_gate)
for forbidden in ("import httpx", "import requests", "plane_sync", "urllib.request"):
assert forbidden not in src, f"serial_gate must stay offline (found {forbidden!r})"
# --------------------------------------------------------------- TC-13
def test_pause_error_fails_open_and_never_raises(monkeypatch):
_make_task("ORCH-280", stage="development") # would close the gate
b = _make_task("ORCH-281", stage="analysis")
job_b = enqueue_job("analyst", "orchestrator", "B", task_id=b)
def _boom():
raise RuntimeError("pause layer probe down")
monkeypatch.setattr(serial_gate, "_pause_layer_enabled", _boom, raising=True)
# build_claim_clause must fail-OPEN ('' fragment) — never raise, never wedge.
assert serial_gate.build_claim_clause() == ""
claimed = claim_next_job()
assert claimed is not None and claimed["id"] == job_b, (
"a pause-layer error must fail-OPEN, not wedge the queue (AC-9)"
)
# The other public functions degrade conservatively without raising.
assert serial_gate.repo_has_active_task("orchestrator") in (True, False)
assert isinstance(serial_gate.snapshot(), dict)
# Freeze direction is NOT inverted by a pause error (still fail-CLOSED on doubt).
monkeypatch.setattr(
serial_gate, "_active_freeze_row",
lambda repo: (_ for _ in ()).throw(RuntimeError("freeze read down")),
raising=True,
)
assert serial_gate.is_repo_frozen("orchestrator") is True
# The DB mutators/readers never raise on bad input either.
assert db.set_task_paused(None) is False
assert db.clear_task_paused(None) is False
assert db.is_task_paused(None) is False
# --------------------------------------------------------------- TC-14
def test_kill_switch_off_is_byte_for_byte_orch088(monkeypatch):
monkeypatch.setattr(cfg.settings, "serial_gate_pause_enabled", False, raising=False)
a = _make_task("ORCH-290", stage="development")
b = _make_task("ORCH-291", stage="analysis")
enqueue_job("analyst", "orchestrator", "B", task_id=b)
db.set_task_paused(a)
# Pause sub-flag OFF -> the pause-term is omitted -> a paused task STILL counts
# as active (deliberate ORCH-088/090 rollback behaviour).
assert "paused_at" not in serial_gate.build_claim_clause()
assert claim_next_job() is None, (
"with the pause sub-flag off serial-gate is byte-for-byte ORCH-088/090"
)
# Outside the (empty) repo scope nothing changes for enduro either.
et = _make_task("ET-290", stage="analysis", repo="enduro-trails")
job_et = enqueue_job("analyst", "enduro-trails", "B", task_id=et)
claimed = claim_next_job()
assert claimed is not None and claimed["id"] == job_et
# --------------------------------------------------------------- TC-15
def test_registries_and_schemas_unchanged():
from src.stages import STAGE_TRANSITIONS
from src.qg.checks import QG_CHECKS
# ORCH-124 is a scheduler-only change: no new edge, no new terminal sink.
assert set(STAGE_TRANSITIONS) == {
"created", "analysis", "architecture", "development", "review",
"testing", "deploy-staging", "deploy", "done", "cancelled",
}
# No serial-gate / pause QG check was introduced (the gate is a scheduler cond).
assert not any("serial" in k or "pause" in k for k in QG_CHECKS)
# Existing table schemas intact; tasks gained the additive paused_at column.
conn = get_db()
try:
task_cols = {r[1] for r in conn.execute("PRAGMA table_info(tasks)").fetchall()}
job_cols = {r[1] for r in conn.execute("PRAGMA table_info(jobs)").fetchall()}
dep_cols = {r[1] for r in conn.execute("PRAGMA table_info(job_deps)").fetchall()}
frz_cols = {r[1] for r in conn.execute("PRAGMA table_info(repo_freeze)").fetchall()}
finally:
conn.close()
assert "paused_at" in task_cols # additive
assert {"id", "repo", "stage", "work_item_id"}.issubset(task_cols)
assert {"id", "agent", "repo", "status", "task_id"}.issubset(job_cols)
assert {"task_id", "depends_on_task_id"}.issubset(dep_cols)
assert {"repo", "frozen_at", "cleared_at"}.issubset(frz_cols)