Этап 1 (serial e2e) пакетного автономного режима. Новая задача репо не входит в analysis (analyst-job не выбирается, ветка не режется), пока в репо есть более ранняя незавершённая задача (FIFO, t2.id < jobs.task_id) ИЛИ репо заморожен. - src/serial_gate.py — новый leaf (never-raise): build_claim_clause (fail-OPEN), is_repo_frozen (fail-CLOSED), set/clear_repo_freeze, serial_gate_applies, snapshot. - src/db.py — идемпотентная миграция repo_freeze + serial_gate-фрагмент в claim_next_job. - src/webhooks/plane.py + src/agents/launcher.py — отложенный срез ветки: start_pipeline не создаёт Gitea-ветку/docs для применимого репо; релокация в _materialize_deferred_branch на момент claim analyst-job (база = свежий origin/main с кодом предшественника, AC-6). - src/stage_engine.py — post-deploy DEGRADED → durable per-repo freeze + Telegram-алерт. - src/main.py — блок serial_gate в GET /queue + POST /serial-gate/unfreeze. - src/config.py — serial_gate_enabled / serial_gate_repos / serial_gate_freeze_enabled. FIFO-уточнение реализации (FR-2): ADR-001 D1 фиксировал t2.id != jobs.task_id; при != пакет одновременно созданных свежих задач взаимно блокировался бы (дедлок). t2.id < jobs.task_id допускает самую раннюю задачу и сериализует остальные, сохраняя AC-1/R-7. STAGE_TRANSITIONS / QG_CHECKS / check_* — без изменений. Аддитивно, под kill-switch, never-raise, restart-safe; при выключенном флаге — нулевая регрессия (enduro не затронут). Тесты: TC-01..TC-22 (test_serial_gate*.py + test_queue_endpoint.py); полный прогон 1114 зелёных. Docs: README (serial gate / /queue / API / БД), CLAUDE.md, CHANGELOG.md, .env.example. Refs: ORCH-088 Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
154 lines
6.1 KiB
Python
154 lines
6.1 KiB
Python
"""ORCH-088 — deferred branch cut / anti-stale-base (FR-1/AC-6).
|
|
|
|
Covers (04-test-plan.yaml):
|
|
TC-13 while the serial gate applies, start_pipeline does NOT create the Gitea
|
|
branch / initial docs (the cut is deferred to the analyst-job claim);
|
|
with the kill-switch off it creates them immediately (1:1 as before).
|
|
TC-14 a branch cut at claim time (ensure_worktree on a not-yet-existing branch)
|
|
is based on a FRESH origin/main that already contains the predecessor:
|
|
git merge-base --is-ancestor <sha A> <base B> is true.
|
|
"""
|
|
import os
|
|
import subprocess
|
|
import tempfile
|
|
|
|
import pytest
|
|
|
|
os.environ["ORCH_DB_PATH"] = os.path.join(tempfile.gettempdir(), "test_serial_gate_branch.db")
|
|
os.environ.setdefault("ORCH_GITEA_TOKEN", "test-token")
|
|
os.environ.setdefault("ORCH_PLANE_API_TOKEN", "test-token")
|
|
|
|
import src.db as db # noqa: E402
|
|
from src.db import init_db # noqa: E402
|
|
from src import config as cfg # noqa: E402
|
|
|
|
|
|
@pytest.fixture(autouse=True)
|
|
def fresh_db(tmp_path, monkeypatch):
|
|
dbfile = tmp_path / "branch.db"
|
|
monkeypatch.setattr(db.settings, "db_path", str(dbfile))
|
|
monkeypatch.setattr(cfg.settings, "serial_gate_enabled", True, raising=False)
|
|
monkeypatch.setattr(cfg.settings, "serial_gate_repos", "", raising=False)
|
|
monkeypatch.setattr(cfg.settings, "serial_gate_freeze_enabled", True, raising=False)
|
|
monkeypatch.setattr(cfg.settings, "task_deps_enabled", False, raising=False)
|
|
monkeypatch.setattr(cfg.settings, "task_deps_source", "db", raising=False)
|
|
init_db()
|
|
yield
|
|
|
|
|
|
# --------------------------------------------------------------- TC-13
|
|
async def _drive_start_pipeline(monkeypatch, gate_applies: bool):
|
|
from src.webhooks import plane
|
|
from src import plane_sync
|
|
from src.projects import ProjectConfig
|
|
|
|
proj = ProjectConfig(
|
|
plane_project_id="proj-uuid",
|
|
repo="orchestrator",
|
|
work_item_prefix="ORCH",
|
|
name="orch",
|
|
)
|
|
monkeypatch.setattr(plane, "get_project_by_plane_id", lambda pid: proj)
|
|
monkeypatch.setattr(plane, "_qg0_errors", lambda name, desc: [])
|
|
monkeypatch.setattr(plane, "ensure_unique_work_item_id", lambda wid, repo: wid)
|
|
monkeypatch.setattr(
|
|
plane, "create_task_atomic",
|
|
lambda *a, **k: ({"id": 1, "work_item_id": "ORCH-500"}, True),
|
|
)
|
|
monkeypatch.setattr(plane_sync, "fetch_issue_sequence_id", lambda *a, **k: 500)
|
|
monkeypatch.setattr(plane_sync, "set_issue_analysis", lambda *a, **k: None)
|
|
monkeypatch.setattr(plane_sync, "add_comment", lambda *a, **k: None)
|
|
monkeypatch.setattr(cfg.settings, "serial_gate_enabled", gate_applies, raising=False)
|
|
|
|
enq = []
|
|
monkeypatch.setattr(plane, "enqueue_job", lambda *a, **k: (enq.append((a, k)) or 99))
|
|
|
|
branch_calls, docs_calls = [], []
|
|
|
|
async def _branch_spy(repo, branch):
|
|
branch_calls.append((repo, branch))
|
|
|
|
async def _docs_spy(repo, branch, wi, name):
|
|
docs_calls.append((repo, branch, wi, name))
|
|
|
|
monkeypatch.setattr(plane, "_create_gitea_branch", _branch_spy)
|
|
monkeypatch.setattr(plane, "_create_initial_docs", _docs_spy)
|
|
|
|
data = {
|
|
"id": "issue-uuid-1",
|
|
"name": "Add serial gate",
|
|
"description_stripped": "A sufficiently long description for QG-0 to pass.",
|
|
"project": "proj-uuid",
|
|
}
|
|
await plane.start_pipeline(data, project_id="proj-uuid")
|
|
return branch_calls, docs_calls, enq
|
|
|
|
|
|
def test_branch_cut_deferred_when_gate_applies(monkeypatch):
|
|
import asyncio
|
|
branch_calls, docs_calls, enq = asyncio.run(
|
|
_drive_start_pipeline(monkeypatch, gate_applies=True)
|
|
)
|
|
assert branch_calls == [], "branch must NOT be cut in start_pipeline while gated"
|
|
assert docs_calls == [], "initial docs must NOT be created while gated"
|
|
# The analyst-job is still enqueued (it waits in the queue without a branch).
|
|
assert any(a[0] == "analyst" for a, k in enq), "analyst-job must still be enqueued"
|
|
|
|
|
|
def test_branch_cut_immediate_when_kill_switch_off(monkeypatch):
|
|
import asyncio
|
|
branch_calls, docs_calls, enq = asyncio.run(
|
|
_drive_start_pipeline(monkeypatch, gate_applies=False)
|
|
)
|
|
assert branch_calls, "with the gate off the branch is cut in start_pipeline (1:1)"
|
|
assert docs_calls, "with the gate off initial docs are created in start_pipeline"
|
|
|
|
|
|
# --------------------------------------------------------------- TC-14
|
|
def _git(*args, cwd):
|
|
env = {
|
|
**os.environ,
|
|
"GIT_AUTHOR_NAME": "t", "GIT_AUTHOR_EMAIL": "t@t",
|
|
"GIT_COMMITTER_NAME": "t", "GIT_COMMITTER_EMAIL": "t@t",
|
|
}
|
|
return subprocess.run(["git", *args], cwd=cwd, env=env,
|
|
capture_output=True, text=True, check=True)
|
|
|
|
|
|
def test_deferred_branch_base_contains_predecessor(tmp_path, monkeypatch):
|
|
"""A branch cut at claim time is based on a fresh origin/main with A's code."""
|
|
from src import git_worktree
|
|
|
|
origin = tmp_path / "origin.git"
|
|
origin.mkdir()
|
|
_git("init", "--bare", "-b", "main", str(origin), cwd=tmp_path)
|
|
|
|
repos_dir = tmp_path / "repos"
|
|
wt_dir = tmp_path / "wt"
|
|
repos_dir.mkdir()
|
|
wt_dir.mkdir()
|
|
repo = "orchestrator"
|
|
clone = repos_dir / repo
|
|
_git("clone", str(origin), str(clone), cwd=tmp_path)
|
|
|
|
# Predecessor A: commit on main + push to origin (== "A merged at its done").
|
|
(clone / "a.txt").write_text("A's code\n")
|
|
_git("add", "a.txt", cwd=clone)
|
|
_git("commit", "-m", "task A", cwd=clone)
|
|
_git("push", "origin", "main", cwd=clone)
|
|
sha_a = _git("rev-parse", "HEAD", cwd=clone).stdout.strip()
|
|
|
|
monkeypatch.setattr(git_worktree.settings, "repos_dir", str(repos_dir), raising=False)
|
|
monkeypatch.setattr(git_worktree.settings, "worktrees_dir", str(wt_dir), raising=False)
|
|
|
|
# Branch B does not exist yet -> ensure_worktree cuts it from fresh origin/main.
|
|
wt = git_worktree.ensure_worktree(repo, "feature/ORCH-B")
|
|
head_b = _git("rev-parse", "HEAD", cwd=wt).stdout.strip()
|
|
|
|
# AC-6: A's commit is an ancestor of B's base.
|
|
r = subprocess.run(
|
|
["git", "-C", wt, "merge-base", "--is-ancestor", sha_a, head_b],
|
|
capture_output=True,
|
|
)
|
|
assert r.returncode == 0, "branch B base must contain predecessor A's commit (AC-6)"
|