Files
orchestrator/tests/test_serial_gate.py
claude-bot ebbf2e7a2d feat(cancel): STOP-status task cancellation + relaunch-hole close (ORCH-090)
Introduce the dedicated Plane STOP status as a single declarative task-cancel
mechanism: stop the active agent (graceful SIGTERM cascade), cancel all jobs
(terminal `cancelled`, never requeued), remove the worktree + delete the remote
feature branch (never main, never force-push), drive the task to the new
system-terminal state `cancelled` and tombstone the natural keys so a later
"To Analyse" re-creates it from scratch (docs artefacts preserved). STOP during a
critical merge/deploy window is deferred until the irreversible step finishes
honestly. Also closes the relaunch hole: handle_status_start relaunch is gated to
the `analysis` stage; the only pipeline-start entry point remains "To Analyse".

Cross-cutting (adr-0026): the "task terminal" predicate is widened {done} ->
{done, cancelled} in serial_gate / task_deps / stages sink + reaper/worker
requeue guards. STAGE_TRANSITIONS exit-gates / QG_CHECKS / check_* are unchanged
(`cancelled` is a sink, not a new edge). Additive, never-raise, restart-safe,
under kill-switch ORCH_STOP_STATUS_ENABLED (off -> zero regression).

New: src/cancel.py (leaf), src/gitea.py (delete_remote_branch), tasks columns
cancelled_at/cancel_requested_at, jobs status `cancelled`, GET /queue `stop` block.
Tests: tests/test_stop_status.py (TC-01..TC-14 + D7); full suite green (1345).
Docs updated in-PR (architecture README, CLAUDE.md, README.md, .env.example,
CHANGELOG). ADR-001 D4 refinement: plane_issue_id is tombstoned too (the lookup
ORs on it) — original UUID recoverable from the parseable suffix.

Refs: ORCH-090

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-09 21:31:56 +03:00

191 lines
8.4 KiB
Python

"""ORCH-088 — per-repo serial gate, unit tests (real tmp SQLite).
Covers (04-test-plan.yaml):
TC-01 claim_next_job does NOT claim an analyst-job of a NEW task B while the
repo has an unfinished task A (gate closed).
TC-02 serial_gate_applies: enabled + empty CSV -> True for any repo; CSV
membership -> True; repo outside CSV -> False; disabled -> False.
TC-03 jobs of an ALREADY-active task (architect/developer/.../deployer) are
never gated — the single active task advances freely.
TC-08 per-repo: an active orchestrator task does NOT gate an enduro analyst-job.
TC-15 kill-switch off -> claim is 1:1 as before ORCH-088.
TC-16 repo outside a non-empty CSV -> gate inert for that repo.
TC-17 DB/build error in the gate -> fail-OPEN: claim does not crash, still claims.
TC-19 snapshot() shape + never-raise.
TC-21 STAGE_TRANSITIONS / QG_CHECKS registries unchanged (no new QG check).
"""
import os
import tempfile
import pytest
os.environ["ORCH_DB_PATH"] = os.path.join(tempfile.gettempdir(), "test_serial_gate.db")
os.environ.setdefault("ORCH_GITEA_TOKEN", "test-token")
os.environ.setdefault("ORCH_PLANE_API_TOKEN", "test-token")
import src.db as db # noqa: E402
from src.db import init_db, get_db, enqueue_job, claim_next_job # noqa: E402
from src import serial_gate # noqa: E402
from src import config as cfg # noqa: E402
@pytest.fixture(autouse=True)
def fresh_db(tmp_path, monkeypatch):
dbfile = tmp_path / "sg.db"
monkeypatch.setattr(db.settings, "db_path", str(dbfile))
# Feature ON by default; freeze layer ON; empty CSV (all repos).
monkeypatch.setattr(cfg.settings, "serial_gate_enabled", True, raising=False)
monkeypatch.setattr(cfg.settings, "serial_gate_repos", "", raising=False)
monkeypatch.setattr(cfg.settings, "serial_gate_freeze_enabled", True, raising=False)
# Keep the unrelated dep-gate inert so claim semantics isolate the serial gate.
monkeypatch.setattr(cfg.settings, "task_deps_enabled", False, raising=False)
init_db()
yield
def _make_task(work_item_id, stage="analysis", repo="orchestrator"):
conn = get_db()
cur = conn.execute(
"INSERT INTO tasks (plane_id, work_item_id, repo, branch, stage, title) "
"VALUES (?, ?, ?, ?, ?, ?)",
(work_item_id, work_item_id, repo, f"feature/{work_item_id}", stage, work_item_id),
)
tid = cur.lastrowid
conn.commit()
conn.close()
return tid
def _set_stage(task_id, stage):
conn = get_db()
conn.execute("UPDATE tasks SET stage=? WHERE id=?", (stage, task_id))
conn.commit()
conn.close()
# --------------------------------------------------------------- TC-01
def test_gate_closed_when_repo_has_active_task():
_make_task("ORCH-201", stage="development") # active predecessor
b = _make_task("ORCH-202", stage="analysis") # new task
job_b = enqueue_job("analyst", "orchestrator", "B", task_id=b)
# A is unfinished -> the analyst-job of B is NOT claimable.
assert claim_next_job() is None, "analyst-job must be gated by active task A"
# /queue shows B waiting + an active task for the repo.
snap = serial_gate.snapshot()
per = snap["per_repo"]["orchestrator"]
assert per["active_task"]["work_item_id"] == "ORCH-201"
assert any(w["job_id"] == job_b for w in per["waiting"])
# --------------------------------------------------------------- TC-02
def test_serial_gate_applies_scopes(monkeypatch):
# enabled + empty CSV -> all repos.
assert serial_gate.serial_gate_applies("orchestrator") is True
assert serial_gate.serial_gate_applies("enduro-trails") is True
# CSV membership.
monkeypatch.setattr(cfg.settings, "serial_gate_repos", "orchestrator", raising=False)
assert serial_gate.serial_gate_applies("orchestrator") is True
assert serial_gate.serial_gate_applies("enduro-trails") is False
# kill-switch off -> never applies.
monkeypatch.setattr(cfg.settings, "serial_gate_enabled", False, raising=False)
assert serial_gate.serial_gate_applies("orchestrator") is False
# --------------------------------------------------------------- TC-03
def test_non_analyst_job_of_active_task_passes():
a = _make_task("ORCH-210", stage="development")
# an unrelated unfinished task in the same repo (would close the gate for analyst)
_make_task("ORCH-211", stage="analysis")
for role in ("architect", "developer", "reviewer", "tester", "deployer"):
jid = enqueue_job(role, "orchestrator", role, task_id=a)
claimed = claim_next_job()
assert claimed is not None and claimed["id"] == jid, (
f"{role}-job of an active task must never be gated"
)
# finish it so the next role's job is the only queued one.
db.mark_job(jid, "done")
# --------------------------------------------------------------- TC-08
def test_per_repo_isolation():
# orchestrator busy; enduro gets a brand-new analyst-job.
_make_task("ORCH-220", stage="development", repo="orchestrator")
b = _make_task("ET-220", stage="analysis", repo="enduro-trails")
job_b = enqueue_job("analyst", "enduro-trails", "B", task_id=b)
claimed = claim_next_job()
assert claimed is not None and claimed["id"] == job_b, (
"orchestrator's active task must not gate enduro's analyst-job"
)
# --------------------------------------------------------------- TC-15
def test_kill_switch_off_is_inert(monkeypatch):
monkeypatch.setattr(cfg.settings, "serial_gate_enabled", False, raising=False)
_make_task("ORCH-230", stage="development") # active task
b = _make_task("ORCH-231", stage="analysis")
job_b = enqueue_job("analyst", "orchestrator", "B", task_id=b)
claimed = claim_next_job()
assert claimed is not None and claimed["id"] == job_b, (
"with the kill-switch off the gate must be inert (claims as before)"
)
# --------------------------------------------------------------- TC-16
def test_repo_outside_csv_not_gated(monkeypatch):
monkeypatch.setattr(cfg.settings, "serial_gate_repos", "enduro-trails", raising=False)
_make_task("ORCH-240", stage="development") # active orchestrator task
b = _make_task("ORCH-241", stage="analysis")
job_b = enqueue_job("analyst", "orchestrator", "B", task_id=b)
claimed = claim_next_job()
assert claimed is not None and claimed["id"] == job_b, (
"orchestrator is outside the CSV scope -> gate must not apply"
)
# --------------------------------------------------------------- TC-17
def test_build_clause_error_fails_open(monkeypatch):
"""A build error in the gate clause must fail-OPEN (claim still proceeds)."""
_make_task("ORCH-250", stage="development") # would close the gate
b = _make_task("ORCH-251", stage="analysis")
job_b = enqueue_job("analyst", "orchestrator", "B", task_id=b)
def _boom():
raise RuntimeError("clause build down")
monkeypatch.setattr(serial_gate, "build_claim_clause", _boom, raising=True)
claimed = claim_next_job()
assert claimed is not None and claimed["id"] == job_b, (
"a gate build error must fail-OPEN, not wedge the queue (AC-8)"
)
# --------------------------------------------------------------- TC-19
def test_snapshot_shape_and_never_raises(monkeypatch):
snap = serial_gate.snapshot()
assert snap["enabled"] is True
assert "repos" in snap and "freeze_enabled" in snap
assert isinstance(snap["per_repo"], dict)
# never-raise: a DB failure -> minimal dict with flags, empty per_repo.
monkeypatch.setattr(
serial_gate, "_known_repos",
lambda: (_ for _ in ()).throw(RuntimeError("db down")),
raising=True,
)
snap2 = serial_gate.snapshot()
assert snap2["per_repo"] == {}
assert snap2["enabled"] is True
# --------------------------------------------------------------- TC-21
def test_registries_unchanged():
from src.stages import STAGE_TRANSITIONS
from src.qg.checks import QG_CHECKS
# ORCH-090 (adr-0026): `cancelled` is added as a terminal SINK (parallel to
# `done`), NOT a new pipeline edge — serial-gate FIFO semantics are unchanged.
assert set(STAGE_TRANSITIONS) == {
"created", "analysis", "architecture", "development", "review",
"testing", "deploy-staging", "deploy", "done", "cancelled",
}
# No serial-gate QG check was introduced (the gate is a scheduler condition).
assert not any("serial" in k for k in QG_CHECKS), "no new QG check expected"