Files
orchestrator/tests/test_serial_gate_freeze.py
claude-bot ee4773f5b0 feat(serial-gate): per-repo serial gate + deferred branch cut + rollback-freeze (ORCH-088)
Этап 1 (serial e2e) пакетного автономного режима. Новая задача репо не входит
в analysis (analyst-job не выбирается, ветка не режется), пока в репо есть более
ранняя незавершённая задача (FIFO, t2.id < jobs.task_id) ИЛИ репо заморожен.

- src/serial_gate.py — новый leaf (never-raise): build_claim_clause (fail-OPEN),
  is_repo_frozen (fail-CLOSED), set/clear_repo_freeze, serial_gate_applies, snapshot.
- src/db.py — идемпотентная миграция repo_freeze + serial_gate-фрагмент в claim_next_job.
- src/webhooks/plane.py + src/agents/launcher.py — отложенный срез ветки: start_pipeline
  не создаёт Gitea-ветку/docs для применимого репо; релокация в _materialize_deferred_branch
  на момент claim analyst-job (база = свежий origin/main с кодом предшественника, AC-6).
- src/stage_engine.py — post-deploy DEGRADED → durable per-repo freeze + Telegram-алерт.
- src/main.py — блок serial_gate в GET /queue + POST /serial-gate/unfreeze.
- src/config.py — serial_gate_enabled / serial_gate_repos / serial_gate_freeze_enabled.

FIFO-уточнение реализации (FR-2): ADR-001 D1 фиксировал t2.id != jobs.task_id; при !=
пакет одновременно созданных свежих задач взаимно блокировался бы (дедлок). t2.id <
jobs.task_id допускает самую раннюю задачу и сериализует остальные, сохраняя AC-1/R-7.

STAGE_TRANSITIONS / QG_CHECKS / check_* — без изменений. Аддитивно, под kill-switch,
never-raise, restart-safe; при выключенном флаге — нулевая регрессия (enduro не затронут).

Тесты: TC-01..TC-22 (test_serial_gate*.py + test_queue_endpoint.py); полный прогон 1114 зелёных.
Docs: README (serial gate / /queue / API / БД), CLAUDE.md, CHANGELOG.md, .env.example.

Refs: ORCH-088
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-09 11:24:48 +03:00

161 lines
7.1 KiB
Python

"""ORCH-088 — rollback-freeze layer (FR-5) tests (real tmp SQLite).
Covers (04-test-plan.yaml):
TC-07 freeze survives a restart (durable in DB) — next task stays gated.
TC-09 freeze of orchestrator does NOT affect enduro-trails (per-repo).
TC-10 post-deploy DEGRADED -> durable freeze row + Telegram alert attempted.
TC-11 an active freeze gates the next analyst-job even with NO unfinished task
(the degraded task is already done — BR-7).
TC-12 manual clear_repo_freeze -> next task is claimable again.
TC-18 is_repo_frozen fails CLOSED on a read error (frozen=True on doubt).
TC-22 repo_freeze migration is idempotent (re-init does not dup / crash).
"""
import os
import tempfile
import pytest
os.environ["ORCH_DB_PATH"] = os.path.join(tempfile.gettempdir(), "test_serial_gate_freeze.db")
os.environ.setdefault("ORCH_GITEA_TOKEN", "test-token")
os.environ.setdefault("ORCH_PLANE_API_TOKEN", "test-token")
import src.db as db # noqa: E402
from src.db import init_db, get_db, enqueue_job, claim_next_job # noqa: E402
from src import serial_gate # noqa: E402
from src import config as cfg # noqa: E402
@pytest.fixture(autouse=True)
def fresh_db(tmp_path, monkeypatch):
dbfile = tmp_path / "freeze.db"
monkeypatch.setattr(db.settings, "db_path", str(dbfile))
monkeypatch.setattr(cfg.settings, "serial_gate_enabled", True, raising=False)
monkeypatch.setattr(cfg.settings, "serial_gate_repos", "", raising=False)
monkeypatch.setattr(cfg.settings, "serial_gate_freeze_enabled", True, raising=False)
monkeypatch.setattr(cfg.settings, "task_deps_enabled", False, raising=False)
init_db()
yield
def _make_task(work_item_id, stage="analysis", repo="orchestrator"):
conn = get_db()
cur = conn.execute(
"INSERT INTO tasks (plane_id, work_item_id, repo, branch, stage) "
"VALUES (?, ?, ?, ?, ?)",
(work_item_id, work_item_id, repo, f"feature/{work_item_id}", stage),
)
tid = cur.lastrowid
conn.commit()
conn.close()
return tid
# --------------------------------------------------------------- TC-07
def test_freeze_survives_restart():
b = _make_task("ORCH-401", stage="analysis")
job_b = enqueue_job("analyst", "orchestrator", "B", task_id=b)
assert serial_gate.set_repo_freeze("orchestrator", "post-deploy DEGRADED", "ORCH-400") is True
assert claim_next_job() is None, "frozen repo gates the analyst-job"
# Simulate restart: no in-memory state, re-init (idempotent) -> still frozen.
init_db()
assert serial_gate.is_repo_frozen("orchestrator") is True
assert claim_next_job() is None, "freeze is durable across restart"
assert job_b # referenced
# --------------------------------------------------------------- TC-09
def test_freeze_is_per_repo():
serial_gate.set_repo_freeze("orchestrator", "DEGRADED", "ORCH-410")
b = _make_task("ET-410", stage="analysis", repo="enduro-trails")
job_b = enqueue_job("analyst", "enduro-trails", "B", task_id=b)
claimed = claim_next_job()
assert claimed is not None and claimed["id"] == job_b, (
"an orchestrator freeze must not gate enduro-trails"
)
assert serial_gate.is_repo_frozen("enduro-trails") is False
# --------------------------------------------------------------- TC-10
def test_post_deploy_degraded_sets_freeze_and_alerts(tmp_path, monkeypatch):
from src import stage_engine, post_deploy
# Sandbox the post-deploy sentinel state dir so a prior DONE marker can't
# short-circuit the tick (state lives under settings.repos_dir).
monkeypatch.setattr(post_deploy.settings, "repos_dir", str(tmp_path), raising=False)
a = _make_task("ORCH-420", stage="done", repo="orchestrator")
job = {"task_id": a, "repo": "orchestrator"}
# Avoid network / git / worktree; force a DEGRADED verdict.
monkeypatch.setattr(post_deploy, "probe_signals",
lambda *a, **k: post_deploy.ProbeResult(False, 2, 2, "down"))
monkeypatch.setattr(post_deploy, "classify", lambda *a, **k: post_deploy.DEGRADED)
monkeypatch.setattr(post_deploy, "write_post_deploy_log", lambda *a, **k: True)
monkeypatch.setattr(stage_engine, "set_issue_blocked", lambda *a, **k: None)
alerts = []
monkeypatch.setattr(stage_engine, "_notify_post_deploy",
lambda wi, msg: alerts.append(msg))
stage_engine.run_post_deploy_monitor(job)
# Durable freeze row written + a freeze alert attempted.
assert serial_gate.is_repo_frozen("orchestrator") is True
assert any("ЗАМОРОЖЕН" in m for m in alerts), f"freeze alert missing: {alerts}"
# --------------------------------------------------------------- TC-11
def test_freeze_gates_even_without_unfinished_task():
_make_task("ORCH-430", stage="done") # degraded task already done
b = _make_task("ORCH-431", stage="analysis")
job_b = enqueue_job("analyst", "orchestrator", "B", task_id=b)
# Without freeze B would be claimable (A done, no earlier unfinished). Freeze it.
serial_gate.set_repo_freeze("orchestrator", "DEGRADED", "ORCH-430")
assert claim_next_job() is None, "active freeze gates the next analyst-job (BR-7)"
assert job_b
# --------------------------------------------------------------- TC-12
def test_manual_unfreeze_lets_next_start():
_make_task("ORCH-440", stage="done")
b = _make_task("ORCH-441", stage="analysis")
job_b = enqueue_job("analyst", "orchestrator", "B", task_id=b)
serial_gate.set_repo_freeze("orchestrator", "DEGRADED", "ORCH-440")
assert claim_next_job() is None
cleared = serial_gate.clear_repo_freeze("orchestrator")
assert cleared >= 1
assert serial_gate.is_repo_frozen("orchestrator") is False
claimed = claim_next_job()
assert claimed is not None and claimed["id"] == job_b
# Idempotent: clearing again clears nothing.
assert serial_gate.clear_repo_freeze("orchestrator") == 0
# --------------------------------------------------------------- TC-18
def test_is_repo_frozen_fails_closed(monkeypatch):
def _boom(repo):
raise RuntimeError("freeze read down")
monkeypatch.setattr(serial_gate, "_active_freeze_row", _boom, raising=True)
# Freeze layer enabled + cannot confirm absence -> fail CLOSED (True).
assert serial_gate.is_repo_frozen("orchestrator") is True
# Freeze layer OFF -> never frozen, even on a read error.
monkeypatch.setattr(cfg.settings, "serial_gate_freeze_enabled", False, raising=False)
assert serial_gate.is_repo_frozen("orchestrator") is False
# --------------------------------------------------------------- TC-22
def test_repo_freeze_migration_idempotent():
# Re-running init_db must not crash or duplicate the table/index.
init_db()
init_db()
conn = get_db()
cols = [r[1] for r in conn.execute("PRAGMA table_info(repo_freeze)").fetchall()]
conn.close()
assert {"repo", "frozen_at", "reason", "work_item_id", "cleared_at"}.issubset(set(cols))
# A freeze still functions after repeated migration.
assert serial_gate.set_repo_freeze("orchestrator", "x", "ORCH-450") is True
assert serial_gate.is_repo_frozen("orchestrator") is True