Files
orchestrator/tests/test_serial_gate.py
claude-bot ee4773f5b0 feat(serial-gate): per-repo serial gate + deferred branch cut + rollback-freeze (ORCH-088)
Этап 1 (serial e2e) пакетного автономного режима. Новая задача репо не входит
в analysis (analyst-job не выбирается, ветка не режется), пока в репо есть более
ранняя незавершённая задача (FIFO, t2.id < jobs.task_id) ИЛИ репо заморожен.

- src/serial_gate.py — новый leaf (never-raise): build_claim_clause (fail-OPEN),
  is_repo_frozen (fail-CLOSED), set/clear_repo_freeze, serial_gate_applies, snapshot.
- src/db.py — идемпотентная миграция repo_freeze + serial_gate-фрагмент в claim_next_job.
- src/webhooks/plane.py + src/agents/launcher.py — отложенный срез ветки: start_pipeline
  не создаёт Gitea-ветку/docs для применимого репо; релокация в _materialize_deferred_branch
  на момент claim analyst-job (база = свежий origin/main с кодом предшественника, AC-6).
- src/stage_engine.py — post-deploy DEGRADED → durable per-repo freeze + Telegram-алерт.
- src/main.py — блок serial_gate в GET /queue + POST /serial-gate/unfreeze.
- src/config.py — serial_gate_enabled / serial_gate_repos / serial_gate_freeze_enabled.

FIFO-уточнение реализации (FR-2): ADR-001 D1 фиксировал t2.id != jobs.task_id; при !=
пакет одновременно созданных свежих задач взаимно блокировался бы (дедлок). t2.id <
jobs.task_id допускает самую раннюю задачу и сериализует остальные, сохраняя AC-1/R-7.

STAGE_TRANSITIONS / QG_CHECKS / check_* — без изменений. Аддитивно, под kill-switch,
never-raise, restart-safe; при выключенном флаге — нулевая регрессия (enduro не затронут).

Тесты: TC-01..TC-22 (test_serial_gate*.py + test_queue_endpoint.py); полный прогон 1114 зелёных.
Docs: README (serial gate / /queue / API / БД), CLAUDE.md, CHANGELOG.md, .env.example.

Refs: ORCH-088
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-09 11:24:48 +03:00

189 lines
8.2 KiB
Python

"""ORCH-088 — per-repo serial gate, unit tests (real tmp SQLite).
Covers (04-test-plan.yaml):
TC-01 claim_next_job does NOT claim an analyst-job of a NEW task B while the
repo has an unfinished task A (gate closed).
TC-02 serial_gate_applies: enabled + empty CSV -> True for any repo; CSV
membership -> True; repo outside CSV -> False; disabled -> False.
TC-03 jobs of an ALREADY-active task (architect/developer/.../deployer) are
never gated — the single active task advances freely.
TC-08 per-repo: an active orchestrator task does NOT gate an enduro analyst-job.
TC-15 kill-switch off -> claim is 1:1 as before ORCH-088.
TC-16 repo outside a non-empty CSV -> gate inert for that repo.
TC-17 DB/build error in the gate -> fail-OPEN: claim does not crash, still claims.
TC-19 snapshot() shape + never-raise.
TC-21 STAGE_TRANSITIONS / QG_CHECKS registries unchanged (no new QG check).
"""
import os
import tempfile
import pytest
os.environ["ORCH_DB_PATH"] = os.path.join(tempfile.gettempdir(), "test_serial_gate.db")
os.environ.setdefault("ORCH_GITEA_TOKEN", "test-token")
os.environ.setdefault("ORCH_PLANE_API_TOKEN", "test-token")
import src.db as db # noqa: E402
from src.db import init_db, get_db, enqueue_job, claim_next_job # noqa: E402
from src import serial_gate # noqa: E402
from src import config as cfg # noqa: E402
@pytest.fixture(autouse=True)
def fresh_db(tmp_path, monkeypatch):
dbfile = tmp_path / "sg.db"
monkeypatch.setattr(db.settings, "db_path", str(dbfile))
# Feature ON by default; freeze layer ON; empty CSV (all repos).
monkeypatch.setattr(cfg.settings, "serial_gate_enabled", True, raising=False)
monkeypatch.setattr(cfg.settings, "serial_gate_repos", "", raising=False)
monkeypatch.setattr(cfg.settings, "serial_gate_freeze_enabled", True, raising=False)
# Keep the unrelated dep-gate inert so claim semantics isolate the serial gate.
monkeypatch.setattr(cfg.settings, "task_deps_enabled", False, raising=False)
init_db()
yield
def _make_task(work_item_id, stage="analysis", repo="orchestrator"):
conn = get_db()
cur = conn.execute(
"INSERT INTO tasks (plane_id, work_item_id, repo, branch, stage, title) "
"VALUES (?, ?, ?, ?, ?, ?)",
(work_item_id, work_item_id, repo, f"feature/{work_item_id}", stage, work_item_id),
)
tid = cur.lastrowid
conn.commit()
conn.close()
return tid
def _set_stage(task_id, stage):
conn = get_db()
conn.execute("UPDATE tasks SET stage=? WHERE id=?", (stage, task_id))
conn.commit()
conn.close()
# --------------------------------------------------------------- TC-01
def test_gate_closed_when_repo_has_active_task():
_make_task("ORCH-201", stage="development") # active predecessor
b = _make_task("ORCH-202", stage="analysis") # new task
job_b = enqueue_job("analyst", "orchestrator", "B", task_id=b)
# A is unfinished -> the analyst-job of B is NOT claimable.
assert claim_next_job() is None, "analyst-job must be gated by active task A"
# /queue shows B waiting + an active task for the repo.
snap = serial_gate.snapshot()
per = snap["per_repo"]["orchestrator"]
assert per["active_task"]["work_item_id"] == "ORCH-201"
assert any(w["job_id"] == job_b for w in per["waiting"])
# --------------------------------------------------------------- TC-02
def test_serial_gate_applies_scopes(monkeypatch):
# enabled + empty CSV -> all repos.
assert serial_gate.serial_gate_applies("orchestrator") is True
assert serial_gate.serial_gate_applies("enduro-trails") is True
# CSV membership.
monkeypatch.setattr(cfg.settings, "serial_gate_repos", "orchestrator", raising=False)
assert serial_gate.serial_gate_applies("orchestrator") is True
assert serial_gate.serial_gate_applies("enduro-trails") is False
# kill-switch off -> never applies.
monkeypatch.setattr(cfg.settings, "serial_gate_enabled", False, raising=False)
assert serial_gate.serial_gate_applies("orchestrator") is False
# --------------------------------------------------------------- TC-03
def test_non_analyst_job_of_active_task_passes():
a = _make_task("ORCH-210", stage="development")
# an unrelated unfinished task in the same repo (would close the gate for analyst)
_make_task("ORCH-211", stage="analysis")
for role in ("architect", "developer", "reviewer", "tester", "deployer"):
jid = enqueue_job(role, "orchestrator", role, task_id=a)
claimed = claim_next_job()
assert claimed is not None and claimed["id"] == jid, (
f"{role}-job of an active task must never be gated"
)
# finish it so the next role's job is the only queued one.
db.mark_job(jid, "done")
# --------------------------------------------------------------- TC-08
def test_per_repo_isolation():
# orchestrator busy; enduro gets a brand-new analyst-job.
_make_task("ORCH-220", stage="development", repo="orchestrator")
b = _make_task("ET-220", stage="analysis", repo="enduro-trails")
job_b = enqueue_job("analyst", "enduro-trails", "B", task_id=b)
claimed = claim_next_job()
assert claimed is not None and claimed["id"] == job_b, (
"orchestrator's active task must not gate enduro's analyst-job"
)
# --------------------------------------------------------------- TC-15
def test_kill_switch_off_is_inert(monkeypatch):
monkeypatch.setattr(cfg.settings, "serial_gate_enabled", False, raising=False)
_make_task("ORCH-230", stage="development") # active task
b = _make_task("ORCH-231", stage="analysis")
job_b = enqueue_job("analyst", "orchestrator", "B", task_id=b)
claimed = claim_next_job()
assert claimed is not None and claimed["id"] == job_b, (
"with the kill-switch off the gate must be inert (claims as before)"
)
# --------------------------------------------------------------- TC-16
def test_repo_outside_csv_not_gated(monkeypatch):
monkeypatch.setattr(cfg.settings, "serial_gate_repos", "enduro-trails", raising=False)
_make_task("ORCH-240", stage="development") # active orchestrator task
b = _make_task("ORCH-241", stage="analysis")
job_b = enqueue_job("analyst", "orchestrator", "B", task_id=b)
claimed = claim_next_job()
assert claimed is not None and claimed["id"] == job_b, (
"orchestrator is outside the CSV scope -> gate must not apply"
)
# --------------------------------------------------------------- TC-17
def test_build_clause_error_fails_open(monkeypatch):
"""A build error in the gate clause must fail-OPEN (claim still proceeds)."""
_make_task("ORCH-250", stage="development") # would close the gate
b = _make_task("ORCH-251", stage="analysis")
job_b = enqueue_job("analyst", "orchestrator", "B", task_id=b)
def _boom():
raise RuntimeError("clause build down")
monkeypatch.setattr(serial_gate, "build_claim_clause", _boom, raising=True)
claimed = claim_next_job()
assert claimed is not None and claimed["id"] == job_b, (
"a gate build error must fail-OPEN, not wedge the queue (AC-8)"
)
# --------------------------------------------------------------- TC-19
def test_snapshot_shape_and_never_raises(monkeypatch):
snap = serial_gate.snapshot()
assert snap["enabled"] is True
assert "repos" in snap and "freeze_enabled" in snap
assert isinstance(snap["per_repo"], dict)
# never-raise: a DB failure -> minimal dict with flags, empty per_repo.
monkeypatch.setattr(
serial_gate, "_known_repos",
lambda: (_ for _ in ()).throw(RuntimeError("db down")),
raising=True,
)
snap2 = serial_gate.snapshot()
assert snap2["per_repo"] == {}
assert snap2["enabled"] is True
# --------------------------------------------------------------- TC-21
def test_registries_unchanged():
from src.stages import STAGE_TRANSITIONS
from src.qg.checks import QG_CHECKS
assert set(STAGE_TRANSITIONS) == {
"created", "analysis", "architecture", "development", "review",
"testing", "deploy-staging", "deploy", "done",
}
# No serial-gate QG check was introduced (the gate is a scheduler condition).
assert not any("serial" in k for k in QG_CHECKS), "no new QG check expected"