Этап 1 (serial e2e) пакетного автономного режима. Новая задача репо не входит в analysis (analyst-job не выбирается, ветка не режется), пока в репо есть более ранняя незавершённая задача (FIFO, t2.id < jobs.task_id) ИЛИ репо заморожен. - src/serial_gate.py — новый leaf (never-raise): build_claim_clause (fail-OPEN), is_repo_frozen (fail-CLOSED), set/clear_repo_freeze, serial_gate_applies, snapshot. - src/db.py — идемпотентная миграция repo_freeze + serial_gate-фрагмент в claim_next_job. - src/webhooks/plane.py + src/agents/launcher.py — отложенный срез ветки: start_pipeline не создаёт Gitea-ветку/docs для применимого репо; релокация в _materialize_deferred_branch на момент claim analyst-job (база = свежий origin/main с кодом предшественника, AC-6). - src/stage_engine.py — post-deploy DEGRADED → durable per-repo freeze + Telegram-алерт. - src/main.py — блок serial_gate в GET /queue + POST /serial-gate/unfreeze. - src/config.py — serial_gate_enabled / serial_gate_repos / serial_gate_freeze_enabled. FIFO-уточнение реализации (FR-2): ADR-001 D1 фиксировал t2.id != jobs.task_id; при != пакет одновременно созданных свежих задач взаимно блокировался бы (дедлок). t2.id < jobs.task_id допускает самую раннюю задачу и сериализует остальные, сохраняя AC-1/R-7. STAGE_TRANSITIONS / QG_CHECKS / check_* — без изменений. Аддитивно, под kill-switch, never-raise, restart-safe; при выключенном флаге — нулевая регрессия (enduro не затронут). Тесты: TC-01..TC-22 (test_serial_gate*.py + test_queue_endpoint.py); полный прогон 1114 зелёных. Docs: README (serial gate / /queue / API / БД), CLAUDE.md, CHANGELOG.md, .env.example. Refs: ORCH-088 Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
114 lines
4.4 KiB
Python
114 lines
4.4 KiB
Python
"""ORCH-088 — serial gate end-to-end queue behaviour (real tmp SQLite).
|
|
|
|
Covers (04-test-plan.yaml):
|
|
TC-04 after A.stage='done' the waiting analyst-job of B is claimed (gate opens
|
|
automatically — no manual action).
|
|
TC-05 a queue of 3 tasks of one repo is processed strictly one-at-a-time, FIFO
|
|
by jobs.id: while A is unfinished neither B nor C starts.
|
|
TC-06 restart-safe: the active task is derived from the DB (tasks.repo +
|
|
stage!='done'), not in-memory — re-reading state keeps the gate closed.
|
|
"""
|
|
import os
|
|
import tempfile
|
|
|
|
import pytest
|
|
|
|
os.environ["ORCH_DB_PATH"] = os.path.join(tempfile.gettempdir(), "test_serial_gate_e2e.db")
|
|
os.environ.setdefault("ORCH_GITEA_TOKEN", "test-token")
|
|
os.environ.setdefault("ORCH_PLANE_API_TOKEN", "test-token")
|
|
|
|
import src.db as db # noqa: E402
|
|
from src.db import init_db, get_db, enqueue_job, claim_next_job # noqa: E402
|
|
from src import config as cfg # noqa: E402
|
|
|
|
|
|
@pytest.fixture(autouse=True)
|
|
def fresh_db(tmp_path, monkeypatch):
|
|
dbfile = tmp_path / "e2e.db"
|
|
monkeypatch.setattr(db.settings, "db_path", str(dbfile))
|
|
monkeypatch.setattr(cfg.settings, "serial_gate_enabled", True, raising=False)
|
|
monkeypatch.setattr(cfg.settings, "serial_gate_repos", "", raising=False)
|
|
monkeypatch.setattr(cfg.settings, "serial_gate_freeze_enabled", True, raising=False)
|
|
monkeypatch.setattr(cfg.settings, "task_deps_enabled", False, raising=False)
|
|
init_db()
|
|
yield
|
|
|
|
|
|
def _make_task(work_item_id, stage="analysis", repo="orchestrator"):
|
|
conn = get_db()
|
|
cur = conn.execute(
|
|
"INSERT INTO tasks (plane_id, work_item_id, repo, branch, stage) "
|
|
"VALUES (?, ?, ?, ?, ?)",
|
|
(work_item_id, work_item_id, repo, f"feature/{work_item_id}", stage),
|
|
)
|
|
tid = cur.lastrowid
|
|
conn.commit()
|
|
conn.close()
|
|
return tid
|
|
|
|
|
|
def _set_stage(task_id, stage):
|
|
conn = get_db()
|
|
conn.execute("UPDATE tasks SET stage=? WHERE id=?", (stage, task_id))
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
|
|
# --------------------------------------------------------------- TC-04
|
|
def test_next_starts_automatically_when_predecessor_done():
|
|
a = _make_task("ORCH-301", stage="development")
|
|
b = _make_task("ORCH-302", stage="analysis")
|
|
job_b = enqueue_job("analyst", "orchestrator", "B", task_id=b)
|
|
|
|
assert claim_next_job() is None, "B gated while A unfinished"
|
|
|
|
# A reaches done -> the gate opens on the NEXT claim tick, no manual action.
|
|
_set_stage(a, "done")
|
|
claimed = claim_next_job()
|
|
assert claimed is not None and claimed["id"] == job_b
|
|
|
|
|
|
# --------------------------------------------------------------- TC-05
|
|
def test_three_tasks_processed_one_at_a_time_fifo():
|
|
a = _make_task("ORCH-310", stage="analysis")
|
|
b = _make_task("ORCH-311", stage="analysis")
|
|
c = _make_task("ORCH-312", stage="analysis")
|
|
job_a = enqueue_job("analyst", "orchestrator", "A", task_id=a)
|
|
job_b = enqueue_job("analyst", "orchestrator", "B", task_id=b)
|
|
job_c = enqueue_job("analyst", "orchestrator", "C", task_id=c)
|
|
|
|
# Only the FIFO-first task (A, lowest id) is claimable.
|
|
claimed = claim_next_job()
|
|
assert claimed is not None and claimed["id"] == job_a
|
|
assert claim_next_job() is None, "B and C must wait while A is unfinished"
|
|
|
|
# A runs through to done; now B (next) is claimable, C still waits.
|
|
db.mark_job(job_a, "done")
|
|
_set_stage(a, "done")
|
|
claimed_b = claim_next_job()
|
|
assert claimed_b is not None and claimed_b["id"] == job_b
|
|
assert claim_next_job() is None, "C must wait while B is unfinished"
|
|
|
|
# B done -> C claimable last (strict FIFO order preserved).
|
|
db.mark_job(job_b, "done")
|
|
_set_stage(b, "done")
|
|
claimed_c = claim_next_job()
|
|
assert claimed_c is not None and claimed_c["id"] == job_c
|
|
|
|
|
|
# --------------------------------------------------------------- TC-06
|
|
def test_restart_safe_active_task_from_db():
|
|
a = _make_task("ORCH-320", stage="development")
|
|
b = _make_task("ORCH-321", stage="analysis")
|
|
job_b = enqueue_job("analyst", "orchestrator", "B", task_id=b)
|
|
assert claim_next_job() is None
|
|
|
|
# Simulate a restart: there is NO in-memory state — the gate recomputes purely
|
|
# from the DB. Re-running init_db (idempotent) + a fresh claim must still gate B.
|
|
init_db()
|
|
assert claim_next_job() is None, "after restart the gate is still closed (DB-derived)"
|
|
|
|
_set_stage(a, "done")
|
|
claimed = claim_next_job()
|
|
assert claimed is not None and claimed["id"] == job_b
|