feat(serial-gate): per-repo serial gate + deferred branch cut + rollback-freeze (ORCH-088)
Этап 1 (serial e2e) пакетного автономного режима. Новая задача репо не входит в analysis (analyst-job не выбирается, ветка не режется), пока в репо есть более ранняя незавершённая задача (FIFO, t2.id < jobs.task_id) ИЛИ репо заморожен. - src/serial_gate.py — новый leaf (never-raise): build_claim_clause (fail-OPEN), is_repo_frozen (fail-CLOSED), set/clear_repo_freeze, serial_gate_applies, snapshot. - src/db.py — идемпотентная миграция repo_freeze + serial_gate-фрагмент в claim_next_job. - src/webhooks/plane.py + src/agents/launcher.py — отложенный срез ветки: start_pipeline не создаёт Gitea-ветку/docs для применимого репо; релокация в _materialize_deferred_branch на момент claim analyst-job (база = свежий origin/main с кодом предшественника, AC-6). - src/stage_engine.py — post-deploy DEGRADED → durable per-repo freeze + Telegram-алерт. - src/main.py — блок serial_gate в GET /queue + POST /serial-gate/unfreeze. - src/config.py — serial_gate_enabled / serial_gate_repos / serial_gate_freeze_enabled. FIFO-уточнение реализации (FR-2): ADR-001 D1 фиксировал t2.id != jobs.task_id; при != пакет одновременно созданных свежих задач взаимно блокировался бы (дедлок). t2.id < jobs.task_id допускает самую раннюю задачу и сериализует остальные, сохраняя AC-1/R-7. STAGE_TRANSITIONS / QG_CHECKS / check_* — без изменений. Аддитивно, под kill-switch, never-raise, restart-safe; при выключенном флаге — нулевая регрессия (enduro не затронут). Тесты: TC-01..TC-22 (test_serial_gate*.py + test_queue_endpoint.py); полный прогон 1114 зелёных. Docs: README (serial gate / /queue / API / БД), CLAUDE.md, CHANGELOG.md, .env.example. Refs: ORCH-088 Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
@@ -418,6 +418,32 @@ class AgentLauncher:
|
||||
pass
|
||||
return None
|
||||
|
||||
def _materialize_deferred_branch(
|
||||
self, repo: str, branch: str, work_item_id: str | None, title: str | None
|
||||
) -> None:
|
||||
"""ORCH-088 (ADR-001 D1): create the deferred Gitea branch + initial docs.
|
||||
|
||||
Relocated from ``webhooks.plane.start_pipeline``: the two coroutines are run
|
||||
SYNCHRONOUSLY here (this method executes in the worker THREAD — no running
|
||||
event loop — so ``asyncio.run`` is safe, R-4). Sequence mirrors the original
|
||||
start_pipeline order so the downstream worktree/PR flow is identical:
|
||||
``_create_gitea_branch`` (from a fresh ``main``) -> ``_create_initial_docs``.
|
||||
Both are idempotent (409/422 -> no-op) so a re-claim after a restart is safe.
|
||||
A transient Gitea error PROPAGATES so the caller (_spawn) fails the launch and
|
||||
the queue worker requeues the job for a later tick (never a half-cut state).
|
||||
"""
|
||||
import asyncio
|
||||
from ..webhooks.plane import _create_gitea_branch, _create_initial_docs
|
||||
|
||||
name = title or work_item_id or branch
|
||||
logger.info(
|
||||
f"ORCH-088: materialising deferred branch '{branch}' for {repo} "
|
||||
f"({work_item_id}) at analyst-job claim"
|
||||
)
|
||||
asyncio.run(_create_gitea_branch(repo, branch))
|
||||
if work_item_id:
|
||||
asyncio.run(_create_initial_docs(repo, branch, work_item_id, name))
|
||||
|
||||
def _spawn(self, agent: str, repo: str, task_content: str = None,
|
||||
task_id: int = None, job_id: int = None) -> int:
|
||||
"""Shared spawn implementation for launch() and launch_job().
|
||||
@@ -437,9 +463,33 @@ class AgentLauncher:
|
||||
raise FileNotFoundError(f"Repo not found: {local_repo_path}")
|
||||
|
||||
# Determine branch (needed before we touch the worktree / task file).
|
||||
_br_row = get_db().execute("SELECT branch FROM tasks WHERE id=?", (task_id,)).fetchone() if task_id else None
|
||||
_br_row = (
|
||||
get_db().execute(
|
||||
"SELECT branch, work_item_id, title FROM tasks WHERE id=?", (task_id,)
|
||||
).fetchone()
|
||||
if task_id else None
|
||||
)
|
||||
agent_branch = _br_row[0] if _br_row else "main"
|
||||
|
||||
# ORCH-088 (FR-1/AC-6, ADR-001 D1): materialise a DEFERRED branch cut. When
|
||||
# the serial gate applies, start_pipeline did NOT create the Gitea branch /
|
||||
# initial docs — they were deferred to this claim so the cut happens from a
|
||||
# fresh origin/main that already contains the predecessor. We only reach this
|
||||
# claim because the gate is OPEN (predecessor done), so it is now safe. This
|
||||
# runs ONLY for the analyst-job (pipeline entry); every later stage reuses the
|
||||
# existing branch. Idempotent (409/422 -> no-op) so a re-claim is safe. On a
|
||||
# transient Gitea error this raises -> _drain_once requeues the job (R-4).
|
||||
if agent == "analyst" and _br_row is not None:
|
||||
try:
|
||||
from .. import serial_gate
|
||||
_applies = serial_gate.serial_gate_applies(repo)
|
||||
except Exception: # noqa: BLE001 - never let the gate check block a launch
|
||||
_applies = False
|
||||
if _applies:
|
||||
self._materialize_deferred_branch(
|
||||
repo, agent_branch, _br_row[1], _br_row[2]
|
||||
)
|
||||
|
||||
# ORCH-41: resolve the Plane project uuid for this repo so per-project
|
||||
# model/effort overrides apply. Unknown repo -> None (env/default only).
|
||||
from ..projects import get_project_by_repo
|
||||
|
||||
@@ -433,6 +433,31 @@ class Settings(BaseSettings):
|
||||
task_deps_enabled: bool = True
|
||||
task_deps_source: str = "db"
|
||||
|
||||
# ORCH-088 (Этап 1, serial e2e): per-repo serial gate. A new task's analyst-job
|
||||
# does NOT enter analysis (no branch cut, no analyst agent) while the same repo
|
||||
# has another unfinished task (tasks.stage != 'done') OR the repo is frozen
|
||||
# (repo_freeze). The gate lives in claim_next_job (offline-safe hot path, like
|
||||
# the ORCH-026 dep-gate) + the branch cut is deferred from start_pipeline to the
|
||||
# analyst-job claim (launcher) so the branch base is always a fresh origin/main
|
||||
# that already contains the predecessor (anti-stale-base, AC-6). All additive,
|
||||
# never-raise, restart-safe; STAGE_TRANSITIONS / QG_CHECKS unchanged. See
|
||||
# docs/work-items/ORCH-088/06-adr/ADR-001-serial-gate.md.
|
||||
# serial_gate_enabled -> kill-switch (env ORCH_SERIAL_GATE_ENABLED).
|
||||
# False -> claim_next_job AND start_pipeline are 1:1
|
||||
# as before ORCH-088 (clause omitted, branch cut in
|
||||
# start_pipeline) — zero regression (AC-7).
|
||||
# serial_gate_repos -> CSV scope (env ORCH_SERIAL_GATE_REPOS). Empty ->
|
||||
# applies to ALL registered repos (D5); non-empty ->
|
||||
# only the listed repos. Repo tokens are sanitised
|
||||
# (^[A-Za-z0-9._-]+$) before being embedded in SQL.
|
||||
# serial_gate_freeze_enabled-> independent tumbler for the FR-5 rollback-freeze
|
||||
# layer (env ORCH_SERIAL_GATE_FREEZE_ENABLED). False
|
||||
# -> freeze is neither set (post-deploy DEGRADED) nor
|
||||
# consulted in the claim gate.
|
||||
serial_gate_enabled: bool = True
|
||||
serial_gate_repos: str = ""
|
||||
serial_gate_freeze_enabled: bool = True
|
||||
|
||||
# ORCH-073 (ADR-001 Р-4): main-integrity regression guard. After the merge-verify
|
||||
# under-gate confirms the deployed SHA is an ancestor of origin/main (FR-1), a
|
||||
# secondary deterministic (no-LLM) guard checks that a declarative set of markers
|
||||
|
||||
34
src/db.py
34
src/db.py
@@ -168,6 +168,26 @@ def init_db():
|
||||
CREATE INDEX IF NOT EXISTS idx_tracker_messages_open
|
||||
ON tracker_messages(task_id) WHERE deleted_at IS NULL;
|
||||
""")
|
||||
# ORCH-088 (FR-5, ADR-001 D2): durable per-repo rollback-freeze. After a
|
||||
# post-deploy DEGRADED verdict the repo is frozen so the serial gate stays
|
||||
# CLOSED unconditionally (the degraded task is already stage='done' — BR-7 — so
|
||||
# the ordinary active-task gate would not hold it) until an operator clears it
|
||||
# via POST /serial-gate/unfreeze. Append-only journal: an ACTIVE freeze for repo
|
||||
# R ⇔ a row with repo=R AND cleared_at IS NULL. Purely ADDITIVE (CREATE
|
||||
# TABLE/INDEX IF NOT EXISTS) -> idempotent, restart-safe on the live shared prod
|
||||
# DB (enduro-trails data untouched). See 08-data-requirements.md.
|
||||
conn.executescript("""
|
||||
CREATE TABLE IF NOT EXISTS repo_freeze (
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
repo TEXT NOT NULL,
|
||||
frozen_at TEXT NOT NULL DEFAULT (datetime('now')),
|
||||
reason TEXT,
|
||||
work_item_id TEXT,
|
||||
cleared_at TEXT
|
||||
);
|
||||
CREATE INDEX IF NOT EXISTS idx_repo_freeze_active
|
||||
ON repo_freeze (repo, cleared_at);
|
||||
""")
|
||||
conn.commit()
|
||||
conn.close()
|
||||
|
||||
@@ -588,6 +608,19 @@ def claim_next_job() -> dict | None:
|
||||
" WHERE d.task_id = jobs.task_id AND t.stage != 'done'"
|
||||
") "
|
||||
)
|
||||
# ORCH-088 (FR-1, ADR-001 D1): per-repo serial gate. An analyst-job of a NEW
|
||||
# task is NOT claimable while the same repo has another unfinished task OR is
|
||||
# frozen. The fragment is built in the serial_gate leaf (sanitised repo scope,
|
||||
# fail-OPEN on any build error so a transient fault never wedges the queue of
|
||||
# ALL projects — AC-8). Jobs of an already-active task (architect/.../deployer)
|
||||
# are unaffected — the gate keys on jobs.agent='analyst' only. Reads only the
|
||||
# local DB (offline-safe hot path, NFR-2).
|
||||
serial_gate = ""
|
||||
try:
|
||||
from . import serial_gate as _serial_gate
|
||||
serial_gate = _serial_gate.build_claim_clause()
|
||||
except Exception: # noqa: BLE001 - fail-OPEN: never wedge the claim
|
||||
serial_gate = ""
|
||||
conn = get_db()
|
||||
try:
|
||||
while True:
|
||||
@@ -595,6 +628,7 @@ def claim_next_job() -> dict | None:
|
||||
"SELECT id FROM jobs WHERE status='queued' "
|
||||
"AND (available_at IS NULL OR available_at <= datetime('now')) "
|
||||
f"{dep_gate}"
|
||||
f"{serial_gate}"
|
||||
"ORDER BY id LIMIT 1"
|
||||
).fetchone()
|
||||
if not row:
|
||||
|
||||
33
src/main.py
33
src/main.py
@@ -149,6 +149,7 @@ async def queue():
|
||||
from . import post_deploy
|
||||
from . import merge_gate
|
||||
from . import task_deps
|
||||
from . import serial_gate
|
||||
return {
|
||||
"counts": job_status_counts(),
|
||||
"max_concurrency": worker.max_concurrency,
|
||||
@@ -161,5 +162,37 @@ async def queue():
|
||||
# ORCH-026 (G-2): declarative task-dependency observability (read-only,
|
||||
# NOT a source of truth) — declared edges, blocked tasks, detected cycle.
|
||||
"task_deps": task_deps.snapshot(),
|
||||
# ORCH-088 (D9 / AC-10): per-repo serial-gate observability (read-only) —
|
||||
# active task, queued/waiting analyst-jobs, freeze state. Additive block.
|
||||
"serial_gate": serial_gate.snapshot(),
|
||||
"recent": recent_jobs(10),
|
||||
}
|
||||
|
||||
|
||||
@app.post("/serial-gate/unfreeze")
|
||||
async def serial_gate_unfreeze(repo: str = ""):
|
||||
"""ORCH-088 (FR-5, ADR-001 D4): manually clear a per-repo rollback-freeze.
|
||||
|
||||
A freeze set by the post-deploy monitor (DEGRADED) keeps the serial gate CLOSED
|
||||
for the repo until an operator explicitly clears it here. Idempotent: clearing
|
||||
an already-clear repo reports ``cleared: 0``. The next queued analyst-job is then
|
||||
claimable on the next scheduler tick (no restart needed). Alternative manual path
|
||||
(documented in README): ``UPDATE repo_freeze SET cleared_at=datetime('now')
|
||||
WHERE repo=? AND cleared_at IS NULL``.
|
||||
"""
|
||||
from . import serial_gate
|
||||
if not repo or not repo.strip():
|
||||
return {"ok": False, "error": "missing 'repo'", "repo": repo, "cleared": 0}
|
||||
repo = repo.strip()
|
||||
cleared = serial_gate.clear_repo_freeze(repo)
|
||||
frozen = serial_gate.is_repo_frozen(repo)
|
||||
if cleared:
|
||||
try:
|
||||
from .notifications import send_telegram
|
||||
send_telegram(
|
||||
f"🔥 {repo}: пакет РАЗМОРОЖЕН вручную ({cleared} запис(ь/и) снято). "
|
||||
f"Следующая задача репо стартует на ближайшем цикле."
|
||||
)
|
||||
except Exception:
|
||||
pass
|
||||
return {"ok": True, "repo": repo, "cleared": cleared, "frozen": frozen}
|
||||
|
||||
404
src/serial_gate.py
Normal file
404
src/serial_gate.py
Normal file
@@ -0,0 +1,404 @@
|
||||
"""ORCH-088 (Этап 1, serial e2e): per-repo serial gate + durable rollback-freeze.
|
||||
|
||||
Leaf module — pure, unit-testable logic over the existing ``tasks`` / ``jobs``
|
||||
tables and the additive ``repo_freeze`` table (see src/db.py /
|
||||
08-data-requirements.md). Mirrors the leaf pattern of ``src/task_deps.py`` /
|
||||
``src/post_deploy.py``: imports only ``db`` + ``config`` (and lazily
|
||||
``projects`` for the snapshot), never ``stage_engine`` / ``launcher``.
|
||||
|
||||
What it enforces (ADR-001):
|
||||
* A NEW task's analyst-job does NOT enter analysis (no branch cut, no analyst
|
||||
agent) while the same repo has ANOTHER unfinished task (``tasks.stage !=
|
||||
'done'``) OR the repo is frozen. The gate is a SQL fragment spliced into
|
||||
``db.claim_next_job`` (offline hot path) — ``build_claim_clause``.
|
||||
* After a post-deploy ``DEGRADED`` verdict the repo is frozen
|
||||
(``set_repo_freeze``); the gate stays CLOSED until an operator clears it
|
||||
(``clear_repo_freeze``). The degraded task is already ``stage='done'`` (BR-7)
|
||||
so freeze is a SEPARATE durable signal, not derived from a stage.
|
||||
|
||||
never-raise contract (self-hosting safety): every public function degrades
|
||||
conservatively and NEVER propagates into the worker / webhook / stage engine.
|
||||
Two deliberately different failure directions (ADR-001 D10, NFR-1):
|
||||
* hot-claim clause build -> fail-OPEN ("" fragment): a transient DB/build error
|
||||
must not wedge the queue of ALL projects (AC-8).
|
||||
* freeze decision (``is_repo_frozen``) -> fail-CLOSED (``True``): when we cannot
|
||||
confirm the ABSENCE of a freeze we keep the gate closed for prod safety (AC-9).
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
import re
|
||||
|
||||
from . import db
|
||||
from .config import settings
|
||||
|
||||
logger = logging.getLogger("orchestrator.serial_gate")
|
||||
|
||||
# Repo tokens embedded into the claim SQL ``IN (...)`` list must match this — a
|
||||
# guard against a broken/injected ORCH_SERIAL_GATE_REPOS CSV (R-6). The CSV is an
|
||||
# operator config (not user input), but the guard is mandatory; an invalid token
|
||||
# is silently dropped.
|
||||
_REPO_TOKEN = re.compile(r"^[A-Za-z0-9._-]+$")
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Conditionality (mirrors post_deploy_applies / _merge_gate_applies)
|
||||
# ---------------------------------------------------------------------------
|
||||
def _scope_repos() -> set[str]:
|
||||
"""Sanitised set of in-scope repo tokens from ``serial_gate_repos`` (CSV).
|
||||
|
||||
Empty/blank CSV -> empty set, meaning "apply to ALL repos" (D5). Invalid
|
||||
tokens (regex miss) are dropped. Never raises.
|
||||
"""
|
||||
try:
|
||||
raw = (settings.serial_gate_repos or "").strip()
|
||||
except Exception: # noqa: BLE001
|
||||
return set()
|
||||
if not raw:
|
||||
return set()
|
||||
out: set[str] = set()
|
||||
for tok in raw.split(","):
|
||||
t = tok.strip()
|
||||
if t and _REPO_TOKEN.match(t):
|
||||
out.add(t)
|
||||
elif t:
|
||||
logger.warning("serial_gate: dropping invalid repo token %r from CSV", t)
|
||||
return out
|
||||
|
||||
|
||||
def serial_gate_applies(repo: str) -> bool:
|
||||
"""Whether the serial gate is REAL for this repo (D5 / AC-7).
|
||||
|
||||
* ``serial_gate_enabled=False`` -> always False (kill-switch; claim and
|
||||
start_pipeline are 1:1 as before ORCH-088).
|
||||
* ``serial_gate_repos`` (CSV) non-empty -> real only for listed repos.
|
||||
* empty CSV -> real for ALL repos (serial e2e + anti-stale-base help every
|
||||
repo, unlike the self-hosting-only ORCH-35/43/58 gates).
|
||||
Never raises -> False on error (degrade to "gate inert", the safe-for-flow
|
||||
default that matches the kill-switch off behaviour).
|
||||
"""
|
||||
try:
|
||||
if not getattr(settings, "serial_gate_enabled", False):
|
||||
return False
|
||||
scope = _scope_repos()
|
||||
if scope:
|
||||
return (repo or "").strip() in scope
|
||||
return True
|
||||
except Exception as e: # noqa: BLE001 - never-raise
|
||||
logger.warning("serial_gate_applies error for %s: %s", repo, e)
|
||||
return False
|
||||
|
||||
|
||||
def _freeze_layer_enabled() -> bool:
|
||||
"""Whether the FR-5 freeze layer is active (independent tumbler, D7)."""
|
||||
try:
|
||||
return bool(getattr(settings, "serial_gate_freeze_enabled", False))
|
||||
except Exception: # noqa: BLE001
|
||||
return False
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Read helpers (active task + freeze) — only the local DB
|
||||
# ---------------------------------------------------------------------------
|
||||
def repo_has_active_task(repo: str, exclude_task_id: int | None = None) -> bool:
|
||||
"""True iff repo has a task with ``stage != 'done'`` (excluding one task).
|
||||
|
||||
``exclude_task_id`` is the task being evaluated (a new/rework task must not
|
||||
count ITSELF as the active task that blocks it — R-7). Observability/Python
|
||||
mirror of the SQL gate; never raises -> False on error.
|
||||
"""
|
||||
try:
|
||||
conn = db.get_db()
|
||||
try:
|
||||
if exclude_task_id is not None:
|
||||
row = conn.execute(
|
||||
"SELECT 1 FROM tasks WHERE repo=? AND id != ? AND stage != 'done' LIMIT 1",
|
||||
(repo, exclude_task_id),
|
||||
).fetchone()
|
||||
else:
|
||||
row = conn.execute(
|
||||
"SELECT 1 FROM tasks WHERE repo=? AND stage != 'done' LIMIT 1",
|
||||
(repo,),
|
||||
).fetchone()
|
||||
return row is not None
|
||||
finally:
|
||||
conn.close()
|
||||
except Exception as e: # noqa: BLE001 - never-raise
|
||||
logger.warning("repo_has_active_task error for %s: %s", repo, e)
|
||||
return False
|
||||
|
||||
|
||||
def _active_freeze_row(repo: str) -> dict | None:
|
||||
"""Most-recent active (``cleared_at IS NULL``) freeze row for repo, or None.
|
||||
|
||||
Raises on a real DB error (the caller decides fail-open vs fail-closed) — this
|
||||
private helper does NOT swallow so ``is_repo_frozen`` can fail CLOSED.
|
||||
"""
|
||||
conn = db.get_db()
|
||||
try:
|
||||
row = conn.execute(
|
||||
"SELECT repo, frozen_at, reason, work_item_id FROM repo_freeze "
|
||||
"WHERE repo=? AND cleared_at IS NULL ORDER BY id DESC LIMIT 1",
|
||||
(repo,),
|
||||
).fetchone()
|
||||
return dict(row) if row else None
|
||||
finally:
|
||||
conn.close()
|
||||
|
||||
|
||||
def is_repo_frozen(repo: str) -> bool:
|
||||
"""True iff repo currently has an active freeze (FR-5).
|
||||
|
||||
fail-CLOSED (AC-9): when the freeze layer is enabled and we CANNOT confirm the
|
||||
absence of a freeze (DB error), return True — keep the gate closed for prod
|
||||
safety. When the freeze layer is disabled the repo is never considered frozen.
|
||||
"""
|
||||
if not _freeze_layer_enabled():
|
||||
return False
|
||||
try:
|
||||
return _active_freeze_row(repo) is not None
|
||||
except Exception as e: # noqa: BLE001 - fail-CLOSED on doubt (AC-9)
|
||||
logger.warning("is_repo_frozen error for %s -> fail-CLOSED (frozen): %s", repo, e)
|
||||
return True
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Freeze mutators (FR-5)
|
||||
# ---------------------------------------------------------------------------
|
||||
def set_repo_freeze(repo: str, reason: str = "", work_item_id: str | None = None) -> bool:
|
||||
"""Insert a durable freeze row for repo (no-op when the freeze layer is off).
|
||||
|
||||
Append-only: a repeated DEGRADED while already frozen simply adds another row
|
||||
(history); ``is_repo_frozen``'s EXISTS is idempotent. Returns True iff a row
|
||||
was inserted. never-raise -> False on error (a freeze write failure must not
|
||||
crash the post-deploy monitor tick).
|
||||
"""
|
||||
if not _freeze_layer_enabled():
|
||||
logger.info("set_repo_freeze: freeze layer disabled, skipping for %s", repo)
|
||||
return False
|
||||
if not repo:
|
||||
return False
|
||||
try:
|
||||
conn = db.get_db()
|
||||
try:
|
||||
conn.execute(
|
||||
"INSERT INTO repo_freeze (repo, reason, work_item_id) VALUES (?, ?, ?)",
|
||||
(repo, reason or None, work_item_id),
|
||||
)
|
||||
conn.commit()
|
||||
finally:
|
||||
conn.close()
|
||||
logger.warning(
|
||||
"serial_gate: repo %s FROZEN (reason=%r, work_item=%s) — next task will "
|
||||
"NOT start until manual unfreeze", repo, reason, work_item_id,
|
||||
)
|
||||
return True
|
||||
except Exception as e: # noqa: BLE001 - never-raise
|
||||
logger.error("set_repo_freeze error for %s: %s", repo, e)
|
||||
return False
|
||||
|
||||
|
||||
def clear_repo_freeze(repo: str) -> int:
|
||||
"""Clear ALL active freeze rows for repo (operator unfreeze, D4).
|
||||
|
||||
Sets ``cleared_at=now`` on every row with ``cleared_at IS NULL``. Idempotent
|
||||
(a repeat clears 0 rows). Returns the number of rows cleared. never-raise -> 0
|
||||
on error.
|
||||
"""
|
||||
if not repo:
|
||||
return 0
|
||||
try:
|
||||
conn = db.get_db()
|
||||
try:
|
||||
cur = conn.execute(
|
||||
"UPDATE repo_freeze SET cleared_at=datetime('now') "
|
||||
"WHERE repo=? AND cleared_at IS NULL",
|
||||
(repo,),
|
||||
)
|
||||
conn.commit()
|
||||
n = cur.rowcount or 0
|
||||
finally:
|
||||
conn.close()
|
||||
if n:
|
||||
logger.warning("serial_gate: repo %s UNFROZEN (%d row(s) cleared)", repo, n)
|
||||
return n
|
||||
except Exception as e: # noqa: BLE001 - never-raise
|
||||
logger.error("clear_repo_freeze error for %s: %s", repo, e)
|
||||
return 0
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Hot-claim SQL fragment (fail-OPEN) — ADR-001 D1
|
||||
# ---------------------------------------------------------------------------
|
||||
def build_claim_clause() -> str:
|
||||
"""Build the ``AND NOT (...)`` fragment spliced into ``claim_next_job``.
|
||||
|
||||
Blocks an analyst-job whose repo (a) has an EARLIER-queued unfinished task or
|
||||
(b) is frozen. Only ``jobs.agent='analyst'`` is gated — jobs of an
|
||||
already-active task pass freely (else the single active task could never
|
||||
advance).
|
||||
|
||||
Ordering term — ``t2.id < jobs.task_id`` (FIFO, ADR-001 D1 / FR-2): a task is
|
||||
blocked only by EARLIER tasks (lower ``tasks.id``) that are not yet done. This
|
||||
is the FIFO refinement of the ADR's pseudo-SQL ``t2.id != jobs.task_id``: with
|
||||
``!=`` a BATCH of fresh tasks all sitting in ``analysis`` would mutually block
|
||||
(each is "another unfinished task" for the others) -> the whole serial queue
|
||||
deadlocks, contradicting FR-2 ("строго по одной, FIFO по jobs.id"). ``<`` admits
|
||||
exactly the oldest unfinished task and serialises the rest behind it, while
|
||||
still never self-blocking a new/rework analyst-job on its OWN row (R-7) and
|
||||
keeping AC-1 (a newer task is held by the older active one) intact.
|
||||
|
||||
Repo scope: empty CSV -> no repo filter (all repos); non-empty CSV -> ``AND
|
||||
jobs.repo IN ('a','b')`` with sanitised tokens (R-6).
|
||||
|
||||
fail-OPEN (AC-8): kill-switch off OR any build error -> ``""`` (claim behaves
|
||||
exactly as before ORCH-088). The trailing space keeps the spliced SQL valid.
|
||||
"""
|
||||
try:
|
||||
if not getattr(settings, "serial_gate_enabled", False):
|
||||
return ""
|
||||
scope = _scope_repos()
|
||||
if scope:
|
||||
# All tokens already passed the _REPO_TOKEN regex -> safe to embed.
|
||||
repo_in = ", ".join(f"'{t}'" for t in sorted(scope))
|
||||
repo_scope = f"AND jobs.repo IN ({repo_in}) "
|
||||
else:
|
||||
repo_scope = ""
|
||||
active_clause = (
|
||||
"EXISTS (SELECT 1 FROM tasks t2 "
|
||||
"WHERE t2.repo = jobs.repo AND t2.id < jobs.task_id "
|
||||
"AND t2.stage != 'done') "
|
||||
)
|
||||
if _freeze_layer_enabled():
|
||||
freeze_clause = (
|
||||
"OR EXISTS (SELECT 1 FROM repo_freeze f "
|
||||
"WHERE f.repo = jobs.repo AND f.cleared_at IS NULL) "
|
||||
)
|
||||
else:
|
||||
freeze_clause = ""
|
||||
return (
|
||||
"AND NOT ( jobs.agent = 'analyst' "
|
||||
f"{repo_scope}"
|
||||
f"AND ( {active_clause}{freeze_clause}) "
|
||||
") "
|
||||
)
|
||||
except Exception as e: # noqa: BLE001 - fail-OPEN: never wedge the queue
|
||||
logger.warning("build_claim_clause error -> fail-OPEN (no gate): %s", e)
|
||||
return ""
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Observability snapshot for GET /queue (D9 / AC-10)
|
||||
# ---------------------------------------------------------------------------
|
||||
def _known_repos() -> list[str]:
|
||||
"""Registered repo names (best-effort) plus any repo with live gate state."""
|
||||
repos: set[str] = set()
|
||||
try:
|
||||
from . import projects
|
||||
for p in projects.PROJECTS:
|
||||
if getattr(p, "repo", None):
|
||||
repos.add(p.repo)
|
||||
except Exception: # noqa: BLE001
|
||||
pass
|
||||
# Also surface repos that have an active freeze or a queued analyst-job even if
|
||||
# they are not in the static registry (defensive — never hide a frozen repo).
|
||||
try:
|
||||
conn = db.get_db()
|
||||
try:
|
||||
for (r,) in conn.execute(
|
||||
"SELECT DISTINCT repo FROM repo_freeze WHERE cleared_at IS NULL"
|
||||
).fetchall():
|
||||
if r:
|
||||
repos.add(r)
|
||||
for (r,) in conn.execute(
|
||||
"SELECT DISTINCT repo FROM jobs WHERE status='queued' AND agent='analyst'"
|
||||
).fetchall():
|
||||
if r:
|
||||
repos.add(r)
|
||||
finally:
|
||||
conn.close()
|
||||
except Exception: # noqa: BLE001
|
||||
pass
|
||||
return sorted(repos)
|
||||
|
||||
|
||||
def _per_repo_snapshot(repo: str) -> dict:
|
||||
"""Per-repo gate state for the /queue snapshot (never raises here)."""
|
||||
active_task = None
|
||||
waiting: list[dict] = []
|
||||
try:
|
||||
conn = db.get_db()
|
||||
try:
|
||||
row = conn.execute(
|
||||
"SELECT work_item_id, stage FROM tasks "
|
||||
"WHERE repo=? AND stage != 'done' ORDER BY id LIMIT 1",
|
||||
(repo,),
|
||||
).fetchone()
|
||||
if row:
|
||||
active_task = {"work_item_id": row["work_item_id"], "stage": row["stage"]}
|
||||
for j in conn.execute(
|
||||
"SELECT j.id AS job_id, t.work_item_id AS work_item_id, t.stage AS stage "
|
||||
"FROM jobs j LEFT JOIN tasks t ON t.id = j.task_id "
|
||||
"WHERE j.repo=? AND j.status='queued' AND j.agent='analyst' "
|
||||
"ORDER BY j.id",
|
||||
(repo,),
|
||||
).fetchall():
|
||||
waiting.append({
|
||||
"job_id": j["job_id"],
|
||||
"work_item_id": j["work_item_id"],
|
||||
"stage": j["stage"],
|
||||
})
|
||||
finally:
|
||||
conn.close()
|
||||
except Exception as e: # noqa: BLE001
|
||||
logger.warning("serial_gate per-repo snapshot error for %s: %s", repo, e)
|
||||
frozen = is_repo_frozen(repo)
|
||||
frozen_reason = None
|
||||
frozen_at = None
|
||||
if frozen:
|
||||
try:
|
||||
fr = _active_freeze_row(repo)
|
||||
if fr:
|
||||
frozen_reason = fr.get("reason")
|
||||
frozen_at = fr.get("frozen_at")
|
||||
except Exception: # noqa: BLE001
|
||||
pass
|
||||
return {
|
||||
"active_task": active_task,
|
||||
"waiting": waiting,
|
||||
"frozen": frozen,
|
||||
"frozen_reason": frozen_reason,
|
||||
"frozen_at": frozen_at,
|
||||
}
|
||||
|
||||
|
||||
def snapshot() -> dict:
|
||||
"""Read-only serial-gate summary for GET /queue (D9 / AC-10).
|
||||
|
||||
Additive block; existing /queue keys are untouched. never-raise: any error ->
|
||||
a minimal dict with the flags and empty per-repo data.
|
||||
"""
|
||||
try:
|
||||
enabled = bool(getattr(settings, "serial_gate_enabled", False))
|
||||
except Exception: # noqa: BLE001
|
||||
enabled = False
|
||||
try:
|
||||
repos_cfg = getattr(settings, "serial_gate_repos", "") or ""
|
||||
except Exception: # noqa: BLE001
|
||||
repos_cfg = ""
|
||||
try:
|
||||
per_repo = {r: _per_repo_snapshot(r) for r in _known_repos()}
|
||||
return {
|
||||
"enabled": enabled,
|
||||
"freeze_enabled": _freeze_layer_enabled(),
|
||||
"repos": repos_cfg,
|
||||
"per_repo": per_repo,
|
||||
}
|
||||
except Exception as e: # noqa: BLE001 - never-raise -> minimal dict
|
||||
logger.warning("serial_gate snapshot error: %s", e)
|
||||
return {
|
||||
"enabled": enabled,
|
||||
"freeze_enabled": False,
|
||||
"repos": repos_cfg,
|
||||
"per_repo": {},
|
||||
}
|
||||
@@ -1708,6 +1708,25 @@ def run_post_deploy_monitor(job: dict):
|
||||
except Exception as e: # noqa: BLE001 - never break the tick
|
||||
logger.warning(f"post-deploy: set Blocked failed for {work_item_id}: {e}")
|
||||
|
||||
# ORCH-088 (FR-5, ADR-001 D3): durable per-repo rollback-freeze. The degraded
|
||||
# task is already stage='done' (BR-7), so the ordinary active-task gate would
|
||||
# NOT hold the next task — we need a separate durable signal. Freeze the repo so
|
||||
# the serial gate stays CLOSED until an operator clears it (POST
|
||||
# /serial-gate/unfreeze). never-raise (set_repo_freeze swallows its own errors);
|
||||
# the freeze is a PASSIVE start-block, it does NOT touch the prod container (NFR-6).
|
||||
try:
|
||||
from . import serial_gate
|
||||
reason = f"post-deploy DEGRADED ({checks_failed}/{checks_total}) action={action_taken}"
|
||||
if serial_gate.set_repo_freeze(repo, reason, work_item_id):
|
||||
_notify_post_deploy(
|
||||
work_item_id,
|
||||
f"🧊 {repo}: пакет ЗАМОРОЖЕН после пост-деплой DEGRADED "
|
||||
f"({work_item_id}). Следующая задача репо НЕ стартует до ручного "
|
||||
f"снятия: POST /serial-gate/unfreeze?repo={repo}.",
|
||||
)
|
||||
except Exception as e: # noqa: BLE001 - never break the tick
|
||||
logger.warning(f"post-deploy: set_repo_freeze failed for {repo}: {e}")
|
||||
|
||||
post_deploy.write_post_deploy_log(
|
||||
repo, work_item_id, branch, post_deploy.DEGRADED, action_taken,
|
||||
settings.post_deploy_window_s, checks_total, checks_failed,
|
||||
|
||||
@@ -573,20 +573,36 @@ async def start_pipeline(data: dict, project_id: str = ""):
|
||||
return
|
||||
task_id = task_row["id"]
|
||||
|
||||
# Create branch in Gitea
|
||||
try:
|
||||
await _create_gitea_branch(repo, branch)
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to create branch '{branch}': {e}")
|
||||
# Task is created, branch creation failed — log but don't crash
|
||||
notify_error(0, f"Branch creation failed: {e}")
|
||||
return
|
||||
# ORCH-088 (FR-1/AC-6, ADR-001 D1): DEFER the branch cut for an applicable repo.
|
||||
# Creating the Gitea branch here (T0, issue -> analysis) would cut it from `main`
|
||||
# BEFORE the predecessor is merged -> stale base. When the serial gate applies we
|
||||
# do NOT create the branch / initial docs now; the analyst-job sits in the queue
|
||||
# (status='queued', no branch) and the gate keeps it there until the predecessor
|
||||
# reaches stage='done'. The branch + docs are then materialised at claim time in
|
||||
# launcher._spawn from a fresh origin/main (anti-stale-base). The task row already
|
||||
# stores `branch` as a NAME (R-5) — only the git ref is deferred.
|
||||
from .. import serial_gate
|
||||
defer_branch = serial_gate.serial_gate_applies(repo)
|
||||
if not defer_branch:
|
||||
# Create branch in Gitea
|
||||
try:
|
||||
await _create_gitea_branch(repo, branch)
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to create branch '{branch}': {e}")
|
||||
# Task is created, branch creation failed — log but don't crash
|
||||
notify_error(0, f"Branch creation failed: {e}")
|
||||
return
|
||||
|
||||
# Create initial docs structure via Gitea API (create file)
|
||||
try:
|
||||
await _create_initial_docs(repo, branch, work_item_id, name)
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to create initial docs: {e}")
|
||||
# Create initial docs structure via Gitea API (create file)
|
||||
try:
|
||||
await _create_initial_docs(repo, branch, work_item_id, name)
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to create initial docs: {e}")
|
||||
else:
|
||||
logger.info(
|
||||
f"Task {work_item_id}: serial gate applies for {repo} -> deferring branch "
|
||||
f"cut to analyst-job claim (anti-stale-base, ORCH-088)"
|
||||
)
|
||||
|
||||
logger.info(f"Task created: {work_item_id} ({name}), branch={branch}, stage=analysis")
|
||||
|
||||
|
||||
Reference in New Issue
Block a user