Introduce the dedicated Plane STOP status as a single declarative task-cancel
mechanism: stop the active agent (graceful SIGTERM cascade), cancel all jobs
(terminal `cancelled`, never requeued), remove the worktree + delete the remote
feature branch (never main, never force-push), drive the task to the new
system-terminal state `cancelled` and tombstone the natural keys so a later
"To Analyse" re-creates it from scratch (docs artefacts preserved). STOP during a
critical merge/deploy window is deferred until the irreversible step finishes
honestly. Also closes the relaunch hole: handle_status_start relaunch is gated to
the `analysis` stage; the only pipeline-start entry point remains "To Analyse".
Cross-cutting (adr-0026): the "task terminal" predicate is widened {done} ->
{done, cancelled} in serial_gate / task_deps / stages sink + reaper/worker
requeue guards. STAGE_TRANSITIONS exit-gates / QG_CHECKS / check_* are unchanged
(`cancelled` is a sink, not a new edge). Additive, never-raise, restart-safe,
under kill-switch ORCH_STOP_STATUS_ENABLED (off -> zero regression).
New: src/cancel.py (leaf), src/gitea.py (delete_remote_branch), tasks columns
cancelled_at/cancel_requested_at, jobs status `cancelled`, GET /queue `stop` block.
Tests: tests/test_stop_status.py (TC-01..TC-14 + D7); full suite green (1345).
Docs updated in-PR (architecture README, CLAUDE.md, README.md, .env.example,
CHANGELOG). ADR-001 D4 refinement: plane_issue_id is tombstoned too (the lookup
ORs on it) — original UUID recoverable from the parseable suffix.
Refs: ORCH-090
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
191 lines
8.4 KiB
Python
191 lines
8.4 KiB
Python
"""ORCH-088 — per-repo serial gate, unit tests (real tmp SQLite).
|
|
|
|
Covers (04-test-plan.yaml):
|
|
TC-01 claim_next_job does NOT claim an analyst-job of a NEW task B while the
|
|
repo has an unfinished task A (gate closed).
|
|
TC-02 serial_gate_applies: enabled + empty CSV -> True for any repo; CSV
|
|
membership -> True; repo outside CSV -> False; disabled -> False.
|
|
TC-03 jobs of an ALREADY-active task (architect/developer/.../deployer) are
|
|
never gated — the single active task advances freely.
|
|
TC-08 per-repo: an active orchestrator task does NOT gate an enduro analyst-job.
|
|
TC-15 kill-switch off -> claim is 1:1 as before ORCH-088.
|
|
TC-16 repo outside a non-empty CSV -> gate inert for that repo.
|
|
TC-17 DB/build error in the gate -> fail-OPEN: claim does not crash, still claims.
|
|
TC-19 snapshot() shape + never-raise.
|
|
TC-21 STAGE_TRANSITIONS / QG_CHECKS registries unchanged (no new QG check).
|
|
"""
|
|
import os
|
|
import tempfile
|
|
|
|
import pytest
|
|
|
|
os.environ["ORCH_DB_PATH"] = os.path.join(tempfile.gettempdir(), "test_serial_gate.db")
|
|
os.environ.setdefault("ORCH_GITEA_TOKEN", "test-token")
|
|
os.environ.setdefault("ORCH_PLANE_API_TOKEN", "test-token")
|
|
|
|
import src.db as db # noqa: E402
|
|
from src.db import init_db, get_db, enqueue_job, claim_next_job # noqa: E402
|
|
from src import serial_gate # noqa: E402
|
|
from src import config as cfg # noqa: E402
|
|
|
|
|
|
@pytest.fixture(autouse=True)
|
|
def fresh_db(tmp_path, monkeypatch):
|
|
dbfile = tmp_path / "sg.db"
|
|
monkeypatch.setattr(db.settings, "db_path", str(dbfile))
|
|
# Feature ON by default; freeze layer ON; empty CSV (all repos).
|
|
monkeypatch.setattr(cfg.settings, "serial_gate_enabled", True, raising=False)
|
|
monkeypatch.setattr(cfg.settings, "serial_gate_repos", "", raising=False)
|
|
monkeypatch.setattr(cfg.settings, "serial_gate_freeze_enabled", True, raising=False)
|
|
# Keep the unrelated dep-gate inert so claim semantics isolate the serial gate.
|
|
monkeypatch.setattr(cfg.settings, "task_deps_enabled", False, raising=False)
|
|
init_db()
|
|
yield
|
|
|
|
|
|
def _make_task(work_item_id, stage="analysis", repo="orchestrator"):
|
|
conn = get_db()
|
|
cur = conn.execute(
|
|
"INSERT INTO tasks (plane_id, work_item_id, repo, branch, stage, title) "
|
|
"VALUES (?, ?, ?, ?, ?, ?)",
|
|
(work_item_id, work_item_id, repo, f"feature/{work_item_id}", stage, work_item_id),
|
|
)
|
|
tid = cur.lastrowid
|
|
conn.commit()
|
|
conn.close()
|
|
return tid
|
|
|
|
|
|
def _set_stage(task_id, stage):
|
|
conn = get_db()
|
|
conn.execute("UPDATE tasks SET stage=? WHERE id=?", (stage, task_id))
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
|
|
# --------------------------------------------------------------- TC-01
|
|
def test_gate_closed_when_repo_has_active_task():
|
|
_make_task("ORCH-201", stage="development") # active predecessor
|
|
b = _make_task("ORCH-202", stage="analysis") # new task
|
|
job_b = enqueue_job("analyst", "orchestrator", "B", task_id=b)
|
|
# A is unfinished -> the analyst-job of B is NOT claimable.
|
|
assert claim_next_job() is None, "analyst-job must be gated by active task A"
|
|
# /queue shows B waiting + an active task for the repo.
|
|
snap = serial_gate.snapshot()
|
|
per = snap["per_repo"]["orchestrator"]
|
|
assert per["active_task"]["work_item_id"] == "ORCH-201"
|
|
assert any(w["job_id"] == job_b for w in per["waiting"])
|
|
|
|
|
|
# --------------------------------------------------------------- TC-02
|
|
def test_serial_gate_applies_scopes(monkeypatch):
|
|
# enabled + empty CSV -> all repos.
|
|
assert serial_gate.serial_gate_applies("orchestrator") is True
|
|
assert serial_gate.serial_gate_applies("enduro-trails") is True
|
|
# CSV membership.
|
|
monkeypatch.setattr(cfg.settings, "serial_gate_repos", "orchestrator", raising=False)
|
|
assert serial_gate.serial_gate_applies("orchestrator") is True
|
|
assert serial_gate.serial_gate_applies("enduro-trails") is False
|
|
# kill-switch off -> never applies.
|
|
monkeypatch.setattr(cfg.settings, "serial_gate_enabled", False, raising=False)
|
|
assert serial_gate.serial_gate_applies("orchestrator") is False
|
|
|
|
|
|
# --------------------------------------------------------------- TC-03
|
|
def test_non_analyst_job_of_active_task_passes():
|
|
a = _make_task("ORCH-210", stage="development")
|
|
# an unrelated unfinished task in the same repo (would close the gate for analyst)
|
|
_make_task("ORCH-211", stage="analysis")
|
|
for role in ("architect", "developer", "reviewer", "tester", "deployer"):
|
|
jid = enqueue_job(role, "orchestrator", role, task_id=a)
|
|
claimed = claim_next_job()
|
|
assert claimed is not None and claimed["id"] == jid, (
|
|
f"{role}-job of an active task must never be gated"
|
|
)
|
|
# finish it so the next role's job is the only queued one.
|
|
db.mark_job(jid, "done")
|
|
|
|
|
|
# --------------------------------------------------------------- TC-08
|
|
def test_per_repo_isolation():
|
|
# orchestrator busy; enduro gets a brand-new analyst-job.
|
|
_make_task("ORCH-220", stage="development", repo="orchestrator")
|
|
b = _make_task("ET-220", stage="analysis", repo="enduro-trails")
|
|
job_b = enqueue_job("analyst", "enduro-trails", "B", task_id=b)
|
|
claimed = claim_next_job()
|
|
assert claimed is not None and claimed["id"] == job_b, (
|
|
"orchestrator's active task must not gate enduro's analyst-job"
|
|
)
|
|
|
|
|
|
# --------------------------------------------------------------- TC-15
|
|
def test_kill_switch_off_is_inert(monkeypatch):
|
|
monkeypatch.setattr(cfg.settings, "serial_gate_enabled", False, raising=False)
|
|
_make_task("ORCH-230", stage="development") # active task
|
|
b = _make_task("ORCH-231", stage="analysis")
|
|
job_b = enqueue_job("analyst", "orchestrator", "B", task_id=b)
|
|
claimed = claim_next_job()
|
|
assert claimed is not None and claimed["id"] == job_b, (
|
|
"with the kill-switch off the gate must be inert (claims as before)"
|
|
)
|
|
|
|
|
|
# --------------------------------------------------------------- TC-16
|
|
def test_repo_outside_csv_not_gated(monkeypatch):
|
|
monkeypatch.setattr(cfg.settings, "serial_gate_repos", "enduro-trails", raising=False)
|
|
_make_task("ORCH-240", stage="development") # active orchestrator task
|
|
b = _make_task("ORCH-241", stage="analysis")
|
|
job_b = enqueue_job("analyst", "orchestrator", "B", task_id=b)
|
|
claimed = claim_next_job()
|
|
assert claimed is not None and claimed["id"] == job_b, (
|
|
"orchestrator is outside the CSV scope -> gate must not apply"
|
|
)
|
|
|
|
|
|
# --------------------------------------------------------------- TC-17
|
|
def test_build_clause_error_fails_open(monkeypatch):
|
|
"""A build error in the gate clause must fail-OPEN (claim still proceeds)."""
|
|
_make_task("ORCH-250", stage="development") # would close the gate
|
|
b = _make_task("ORCH-251", stage="analysis")
|
|
job_b = enqueue_job("analyst", "orchestrator", "B", task_id=b)
|
|
|
|
def _boom():
|
|
raise RuntimeError("clause build down")
|
|
|
|
monkeypatch.setattr(serial_gate, "build_claim_clause", _boom, raising=True)
|
|
claimed = claim_next_job()
|
|
assert claimed is not None and claimed["id"] == job_b, (
|
|
"a gate build error must fail-OPEN, not wedge the queue (AC-8)"
|
|
)
|
|
|
|
|
|
# --------------------------------------------------------------- TC-19
|
|
def test_snapshot_shape_and_never_raises(monkeypatch):
|
|
snap = serial_gate.snapshot()
|
|
assert snap["enabled"] is True
|
|
assert "repos" in snap and "freeze_enabled" in snap
|
|
assert isinstance(snap["per_repo"], dict)
|
|
# never-raise: a DB failure -> minimal dict with flags, empty per_repo.
|
|
monkeypatch.setattr(
|
|
serial_gate, "_known_repos",
|
|
lambda: (_ for _ in ()).throw(RuntimeError("db down")),
|
|
raising=True,
|
|
)
|
|
snap2 = serial_gate.snapshot()
|
|
assert snap2["per_repo"] == {}
|
|
assert snap2["enabled"] is True
|
|
|
|
|
|
# --------------------------------------------------------------- TC-21
|
|
def test_registries_unchanged():
|
|
from src.stages import STAGE_TRANSITIONS
|
|
from src.qg.checks import QG_CHECKS
|
|
# ORCH-090 (adr-0026): `cancelled` is added as a terminal SINK (parallel to
|
|
# `done`), NOT a new pipeline edge — serial-gate FIFO semantics are unchanged.
|
|
assert set(STAGE_TRANSITIONS) == {
|
|
"created", "analysis", "architecture", "development", "review",
|
|
"testing", "deploy-staging", "deploy", "done", "cancelled",
|
|
}
|
|
# No serial-gate QG check was introduced (the gate is a scheduler condition).
|
|
assert not any("serial" in k for k in QG_CHECKS), "no new QG check expected"
|