"""ORCH-088 — rollback-freeze layer (FR-5) tests (real tmp SQLite). Covers (04-test-plan.yaml): TC-07 freeze survives a restart (durable in DB) — next task stays gated. TC-09 freeze of orchestrator does NOT affect enduro-trails (per-repo). TC-10 post-deploy DEGRADED -> durable freeze row + Telegram alert attempted. TC-11 an active freeze gates the next analyst-job even with NO unfinished task (the degraded task is already done — BR-7). TC-12 manual clear_repo_freeze -> next task is claimable again. TC-18 is_repo_frozen fails CLOSED on a read error (frozen=True on doubt). TC-22 repo_freeze migration is idempotent (re-init does not dup / crash). """ import os import tempfile import pytest os.environ["ORCH_DB_PATH"] = os.path.join(tempfile.gettempdir(), "test_serial_gate_freeze.db") os.environ.setdefault("ORCH_GITEA_TOKEN", "test-token") os.environ.setdefault("ORCH_PLANE_API_TOKEN", "test-token") import src.db as db # noqa: E402 from src.db import init_db, get_db, enqueue_job, claim_next_job # noqa: E402 from src import serial_gate # noqa: E402 from src import config as cfg # noqa: E402 @pytest.fixture(autouse=True) def fresh_db(tmp_path, monkeypatch): dbfile = tmp_path / "freeze.db" monkeypatch.setattr(db.settings, "db_path", str(dbfile)) monkeypatch.setattr(cfg.settings, "serial_gate_enabled", True, raising=False) monkeypatch.setattr(cfg.settings, "serial_gate_repos", "", raising=False) monkeypatch.setattr(cfg.settings, "serial_gate_freeze_enabled", True, raising=False) monkeypatch.setattr(cfg.settings, "task_deps_enabled", False, raising=False) init_db() yield def _make_task(work_item_id, stage="analysis", repo="orchestrator"): conn = get_db() cur = conn.execute( "INSERT INTO tasks (plane_id, work_item_id, repo, branch, stage) " "VALUES (?, ?, ?, ?, ?)", (work_item_id, work_item_id, repo, f"feature/{work_item_id}", stage), ) tid = cur.lastrowid conn.commit() conn.close() return tid # --------------------------------------------------------------- TC-07 def test_freeze_survives_restart(): b = _make_task("ORCH-401", stage="analysis") job_b = enqueue_job("analyst", "orchestrator", "B", task_id=b) assert serial_gate.set_repo_freeze("orchestrator", "post-deploy DEGRADED", "ORCH-400") is True assert claim_next_job() is None, "frozen repo gates the analyst-job" # Simulate restart: no in-memory state, re-init (idempotent) -> still frozen. init_db() assert serial_gate.is_repo_frozen("orchestrator") is True assert claim_next_job() is None, "freeze is durable across restart" assert job_b # referenced # --------------------------------------------------------------- TC-09 def test_freeze_is_per_repo(): serial_gate.set_repo_freeze("orchestrator", "DEGRADED", "ORCH-410") b = _make_task("ET-410", stage="analysis", repo="enduro-trails") job_b = enqueue_job("analyst", "enduro-trails", "B", task_id=b) claimed = claim_next_job() assert claimed is not None and claimed["id"] == job_b, ( "an orchestrator freeze must not gate enduro-trails" ) assert serial_gate.is_repo_frozen("enduro-trails") is False # --------------------------------------------------------------- TC-10 def test_post_deploy_degraded_sets_freeze_and_alerts(tmp_path, monkeypatch): from src import stage_engine, post_deploy # Sandbox the post-deploy sentinel state dir so a prior DONE marker can't # short-circuit the tick (state lives under settings.repos_dir). monkeypatch.setattr(post_deploy.settings, "repos_dir", str(tmp_path), raising=False) a = _make_task("ORCH-420", stage="done", repo="orchestrator") job = {"task_id": a, "repo": "orchestrator"} # Avoid network / git / worktree; force a DEGRADED verdict. monkeypatch.setattr(post_deploy, "probe_signals", lambda *a, **k: post_deploy.ProbeResult(False, 2, 2, "down")) monkeypatch.setattr(post_deploy, "classify", lambda *a, **k: post_deploy.DEGRADED) monkeypatch.setattr(post_deploy, "write_post_deploy_log", lambda *a, **k: True) monkeypatch.setattr(stage_engine, "set_issue_blocked", lambda *a, **k: None) alerts = [] monkeypatch.setattr(stage_engine, "_notify_post_deploy", lambda wi, msg: alerts.append(msg)) stage_engine.run_post_deploy_monitor(job) # Durable freeze row written + a freeze alert attempted. assert serial_gate.is_repo_frozen("orchestrator") is True assert any("ЗАМОРОЖЕН" in m for m in alerts), f"freeze alert missing: {alerts}" # --------------------------------------------------------------- TC-11 def test_freeze_gates_even_without_unfinished_task(): _make_task("ORCH-430", stage="done") # degraded task already done b = _make_task("ORCH-431", stage="analysis") job_b = enqueue_job("analyst", "orchestrator", "B", task_id=b) # Without freeze B would be claimable (A done, no earlier unfinished). Freeze it. serial_gate.set_repo_freeze("orchestrator", "DEGRADED", "ORCH-430") assert claim_next_job() is None, "active freeze gates the next analyst-job (BR-7)" assert job_b # --------------------------------------------------------------- TC-12 def test_manual_unfreeze_lets_next_start(): _make_task("ORCH-440", stage="done") b = _make_task("ORCH-441", stage="analysis") job_b = enqueue_job("analyst", "orchestrator", "B", task_id=b) serial_gate.set_repo_freeze("orchestrator", "DEGRADED", "ORCH-440") assert claim_next_job() is None cleared = serial_gate.clear_repo_freeze("orchestrator") assert cleared >= 1 assert serial_gate.is_repo_frozen("orchestrator") is False claimed = claim_next_job() assert claimed is not None and claimed["id"] == job_b # Idempotent: clearing again clears nothing. assert serial_gate.clear_repo_freeze("orchestrator") == 0 # --------------------------------------------------------------- TC-18 def test_is_repo_frozen_fails_closed(monkeypatch): def _boom(repo): raise RuntimeError("freeze read down") monkeypatch.setattr(serial_gate, "_active_freeze_row", _boom, raising=True) # Freeze layer enabled + cannot confirm absence -> fail CLOSED (True). assert serial_gate.is_repo_frozen("orchestrator") is True # Freeze layer OFF -> never frozen, even on a read error. monkeypatch.setattr(cfg.settings, "serial_gate_freeze_enabled", False, raising=False) assert serial_gate.is_repo_frozen("orchestrator") is False # --------------------------------------------------------------- TC-22 def test_repo_freeze_migration_idempotent(): # Re-running init_db must not crash or duplicate the table/index. init_db() init_db() conn = get_db() cols = [r[1] for r in conn.execute("PRAGMA table_info(repo_freeze)").fetchall()] conn.close() assert {"repo", "frozen_at", "reason", "work_item_id", "cleared_at"}.issubset(set(cols)) # A freeze still functions after repeated migration. assert serial_gate.set_repo_freeze("orchestrator", "x", "ORCH-450") is True assert serial_gate.is_repo_frozen("orchestrator") is True