Fixes incident ORCH-116/ORCH-123: serial_gate defined a repo's "active task"
purely by machine stage (tasks.stage NOT IN ('done','cancelled')). Plane statuses
Backlog/Blocked/Needs-Input (layer-B indication, ORCH-066) do NOT change
tasks.stage (layer A), so a paused predecessor was indistinguishable from an active
one and held the FIFO gate closed against an urgent successor — the urgent fix
could not start until the paused task was formally done.
Introduces an explicit, durable, DB-resolvable per-task "park" signal — additive
nullable column tasks.paused_at (pattern of cancelled_at/track) — and a new
ORTHOGONAL scheduler "pause" axis. The serial-gate "active task" predicate becomes
`stage NOT IN ('done','cancelled') AND paused_at IS NULL` across all three points
(build_claim_clause / repo_has_active_task / _per_repo_snapshot). The terminal set
{done,cancelled} in serial_gate/task_deps/stages.py is byte-for-byte unchanged
(adr-0026 not regressed): task_deps/stages.py do NOT read paused_at, so a paused
declared dependency and an active repo_freeze STILL block (pause never bypasses
them — different axes). Anti-stale-base on resume relies on the existing deferred
branch cut (ORCH-088) + pre-merge auto_rebase_onto_main + merge-gate re-test
(ORCH-026/093/110) — no new rebase machinery.
Additive, under an independent sub-flag, never-raise, restart-safe; hot-claim
fail-OPEN and freeze fail-CLOSED preserved. STAGE_TRANSITIONS / QG_CHECKS / check_*
/ machine-verdict keys / existing table schemas are byte-for-byte untouched (this is
a queue-scheduler + observability change, not a Quality Gate).
- src/db.py: additive tasks.paused_at column (_ensure_column) + set/clear/is helpers
- src/serial_gate.py: _pause_layer_enabled() + pause-term in the 3 points; `paused`
list + per-job `reason` (freeze>dependency>active-task>null) in the /queue snapshot
- src/config.py + .env.example: serial_gate_pause_enabled (default True = true no-op)
- src/main.py: POST /serial-gate/pause|resume?work_item=<id> (by образцу unfreeze)
- tests/test_orch124_serial_gate_pause.py: TC-01 mandatory incident regress + TC-02..15
- CHANGELOG.md: [Unreleased] entry
ADR: docs/work-items/ORCH-124/06-adr/ADR-001-serial-gate-pause-without-blocking.md
Cross-cutting: docs/architecture/adr/adr-0051-serial-gate-pause-without-blocking.md
Refs: ORCH-124
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
527 lines
22 KiB
Python
527 lines
22 KiB
Python
"""ORCH-088 (Этап 1, serial e2e): per-repo serial gate + durable rollback-freeze.
|
|
|
|
Leaf module — pure, unit-testable logic over the existing ``tasks`` / ``jobs``
|
|
tables and the additive ``repo_freeze`` table (see src/db.py /
|
|
08-data-requirements.md). Mirrors the leaf pattern of ``src/task_deps.py`` /
|
|
``src/post_deploy.py``: imports only ``db`` + ``config`` (and lazily
|
|
``projects`` for the snapshot), never ``stage_engine`` / ``launcher``.
|
|
|
|
What it enforces (ADR-001):
|
|
* A NEW task's analyst-job does NOT enter analysis (no branch cut, no analyst
|
|
agent) while the same repo has ANOTHER unfinished task (``tasks.stage !=
|
|
'done'``) OR the repo is frozen. The gate is a SQL fragment spliced into
|
|
``db.claim_next_job`` (offline hot path) — ``build_claim_clause``.
|
|
* After a post-deploy ``DEGRADED`` verdict the repo is frozen
|
|
(``set_repo_freeze``); the gate stays CLOSED until an operator clears it
|
|
(``clear_repo_freeze``). The degraded task is already ``stage='done'`` (BR-7)
|
|
so freeze is a SEPARATE durable signal, not derived from a stage.
|
|
|
|
never-raise contract (self-hosting safety): every public function degrades
|
|
conservatively and NEVER propagates into the worker / webhook / stage engine.
|
|
Two deliberately different failure directions (ADR-001 D10, NFR-1):
|
|
* hot-claim clause build -> fail-OPEN ("" fragment): a transient DB/build error
|
|
must not wedge the queue of ALL projects (AC-8).
|
|
* freeze decision (``is_repo_frozen``) -> fail-CLOSED (``True``): when we cannot
|
|
confirm the ABSENCE of a freeze we keep the gate closed for prod safety (AC-9).
|
|
|
|
ORCH-124 (adr-0051): adds an ORTHOGONAL "pause" axis to the "active task" predicate
|
|
of all three points (``build_claim_clause`` / ``repo_has_active_task`` /
|
|
``_per_repo_snapshot``). A task with ``tasks.paused_at`` NOT NULL (an operator
|
|
``POST /serial-gate/pause``) is excluded from the FIFO "active" set so an URGENT
|
|
successor may overtake a paused predecessor — fixing incident ORCH-116/ORCH-123. The
|
|
terminal set ``{done,cancelled}`` (adr-0026) is UNCHANGED; ``task_deps`` / ``stages.py``
|
|
do NOT read ``paused_at`` (pause never bypasses a freeze or a declared dependency).
|
|
Gated by the independent sub-flag ``serial_gate_pause_enabled`` (default True is a true
|
|
no-op until the first explicit pause).
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import logging
|
|
import re
|
|
|
|
from . import db
|
|
from .config import settings
|
|
|
|
logger = logging.getLogger("orchestrator.serial_gate")
|
|
|
|
# Repo tokens embedded into the claim SQL ``IN (...)`` list must match this — a
|
|
# guard against a broken/injected ORCH_SERIAL_GATE_REPOS CSV (R-6). The CSV is an
|
|
# operator config (not user input), but the guard is mandatory; an invalid token
|
|
# is silently dropped.
|
|
_REPO_TOKEN = re.compile(r"^[A-Za-z0-9._-]+$")
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Conditionality (mirrors post_deploy_applies / _merge_gate_applies)
|
|
# ---------------------------------------------------------------------------
|
|
def _scope_repos() -> set[str]:
|
|
"""Sanitised set of in-scope repo tokens from ``serial_gate_repos`` (CSV).
|
|
|
|
Empty/blank CSV -> empty set, meaning "apply to ALL repos" (D5). Invalid
|
|
tokens (regex miss) are dropped. Never raises.
|
|
"""
|
|
try:
|
|
raw = (settings.serial_gate_repos or "").strip()
|
|
except Exception: # noqa: BLE001
|
|
return set()
|
|
if not raw:
|
|
return set()
|
|
out: set[str] = set()
|
|
for tok in raw.split(","):
|
|
t = tok.strip()
|
|
if t and _REPO_TOKEN.match(t):
|
|
out.add(t)
|
|
elif t:
|
|
logger.warning("serial_gate: dropping invalid repo token %r from CSV", t)
|
|
return out
|
|
|
|
|
|
def serial_gate_applies(repo: str) -> bool:
|
|
"""Whether the serial gate is REAL for this repo (D5 / AC-7).
|
|
|
|
* ``serial_gate_enabled=False`` -> always False (kill-switch; claim and
|
|
start_pipeline are 1:1 as before ORCH-088).
|
|
* ``serial_gate_repos`` (CSV) non-empty -> real only for listed repos.
|
|
* empty CSV -> real for ALL repos (serial e2e + anti-stale-base help every
|
|
repo, unlike the self-hosting-only ORCH-35/43/58 gates).
|
|
Never raises -> False on error (degrade to "gate inert", the safe-for-flow
|
|
default that matches the kill-switch off behaviour).
|
|
"""
|
|
try:
|
|
if not getattr(settings, "serial_gate_enabled", False):
|
|
return False
|
|
scope = _scope_repos()
|
|
if scope:
|
|
return (repo or "").strip() in scope
|
|
return True
|
|
except Exception as e: # noqa: BLE001 - never-raise
|
|
logger.warning("serial_gate_applies error for %s: %s", repo, e)
|
|
return False
|
|
|
|
|
|
def _freeze_layer_enabled() -> bool:
|
|
"""Whether the FR-5 freeze layer is active (independent tumbler, D7)."""
|
|
try:
|
|
return bool(getattr(settings, "serial_gate_freeze_enabled", False))
|
|
except Exception: # noqa: BLE001
|
|
return False
|
|
|
|
|
|
def _pause_layer_enabled() -> bool:
|
|
"""ORCH-124 (adr-0051 D6): whether the per-task pause axis is active.
|
|
|
|
Independent tumbler ``serial_gate_pause_enabled`` (mirror of
|
|
``_freeze_layer_enabled``). When True the "active task" predicate of all three
|
|
serial-gate points additionally excludes paused tasks (``paused_at IS NULL``);
|
|
when False the pause-term is omitted and serial-gate behaves byte-for-byte as
|
|
ORCH-088/090. Default True is a true no-op until an operator parks a task
|
|
(``paused_at`` is NULL for every row). never-raise -> False (pause inert).
|
|
"""
|
|
try:
|
|
return bool(getattr(settings, "serial_gate_pause_enabled", False))
|
|
except Exception: # noqa: BLE001
|
|
return False
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Read helpers (active task + freeze) — only the local DB
|
|
# ---------------------------------------------------------------------------
|
|
def repo_has_active_task(repo: str, exclude_task_id: int | None = None) -> bool:
|
|
"""True iff repo has a task with ``stage != 'done'`` (excluding one task).
|
|
|
|
``exclude_task_id`` is the task being evaluated (a new/rework task must not
|
|
count ITSELF as the active task that blocks it — R-7). Observability/Python
|
|
mirror of the SQL gate; never raises -> False on error.
|
|
"""
|
|
try:
|
|
conn = db.get_db()
|
|
try:
|
|
# ORCH-090 (adr-0026): terminal set is {done,cancelled}. A cancelled
|
|
# task must NOT count as "active" or it would block the repo's serial
|
|
# gate forever.
|
|
# ORCH-124 (adr-0051 D4.2): under the pause layer a PARKED task
|
|
# (paused_at NOT NULL) is likewise NOT "active" — it must not hold the
|
|
# FIFO gate against an urgent successor. Same predicate as the hot SQL
|
|
# (D4.1) and the snapshot (D4.3) so the three points never drift (TR-7).
|
|
pause_term = " AND paused_at IS NULL" if _pause_layer_enabled() else ""
|
|
if exclude_task_id is not None:
|
|
row = conn.execute(
|
|
"SELECT 1 FROM tasks WHERE repo=? AND id != ? "
|
|
f"AND stage NOT IN ('done','cancelled'){pause_term} LIMIT 1",
|
|
(repo, exclude_task_id),
|
|
).fetchone()
|
|
else:
|
|
row = conn.execute(
|
|
"SELECT 1 FROM tasks WHERE repo=? "
|
|
f"AND stage NOT IN ('done','cancelled'){pause_term} LIMIT 1",
|
|
(repo,),
|
|
).fetchone()
|
|
return row is not None
|
|
finally:
|
|
conn.close()
|
|
except Exception as e: # noqa: BLE001 - never-raise
|
|
logger.warning("repo_has_active_task error for %s: %s", repo, e)
|
|
return False
|
|
|
|
|
|
def _active_freeze_row(repo: str) -> dict | None:
|
|
"""Most-recent active (``cleared_at IS NULL``) freeze row for repo, or None.
|
|
|
|
Raises on a real DB error (the caller decides fail-open vs fail-closed) — this
|
|
private helper does NOT swallow so ``is_repo_frozen`` can fail CLOSED.
|
|
"""
|
|
conn = db.get_db()
|
|
try:
|
|
row = conn.execute(
|
|
"SELECT repo, frozen_at, reason, work_item_id FROM repo_freeze "
|
|
"WHERE repo=? AND cleared_at IS NULL ORDER BY id DESC LIMIT 1",
|
|
(repo,),
|
|
).fetchone()
|
|
return dict(row) if row else None
|
|
finally:
|
|
conn.close()
|
|
|
|
|
|
def is_repo_frozen(repo: str) -> bool:
|
|
"""True iff repo currently has an active freeze (FR-5).
|
|
|
|
fail-CLOSED (AC-9): when the freeze layer is enabled and we CANNOT confirm the
|
|
absence of a freeze (DB error), return True — keep the gate closed for prod
|
|
safety. When the freeze layer is disabled the repo is never considered frozen.
|
|
"""
|
|
if not _freeze_layer_enabled():
|
|
return False
|
|
try:
|
|
return _active_freeze_row(repo) is not None
|
|
except Exception as e: # noqa: BLE001 - fail-CLOSED on doubt (AC-9)
|
|
logger.warning("is_repo_frozen error for %s -> fail-CLOSED (frozen): %s", repo, e)
|
|
return True
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Freeze mutators (FR-5)
|
|
# ---------------------------------------------------------------------------
|
|
def set_repo_freeze(repo: str, reason: str = "", work_item_id: str | None = None) -> bool:
|
|
"""Insert a durable freeze row for repo (no-op when the freeze layer is off).
|
|
|
|
Append-only: a repeated DEGRADED while already frozen simply adds another row
|
|
(history); ``is_repo_frozen``'s EXISTS is idempotent. Returns True iff a row
|
|
was inserted. never-raise -> False on error (a freeze write failure must not
|
|
crash the post-deploy monitor tick).
|
|
"""
|
|
if not _freeze_layer_enabled():
|
|
logger.info("set_repo_freeze: freeze layer disabled, skipping for %s", repo)
|
|
return False
|
|
if not repo:
|
|
return False
|
|
try:
|
|
conn = db.get_db()
|
|
try:
|
|
conn.execute(
|
|
"INSERT INTO repo_freeze (repo, reason, work_item_id) VALUES (?, ?, ?)",
|
|
(repo, reason or None, work_item_id),
|
|
)
|
|
conn.commit()
|
|
finally:
|
|
conn.close()
|
|
logger.warning(
|
|
"serial_gate: repo %s FROZEN (reason=%r, work_item=%s) — next task will "
|
|
"NOT start until manual unfreeze", repo, reason, work_item_id,
|
|
)
|
|
return True
|
|
except Exception as e: # noqa: BLE001 - never-raise
|
|
logger.error("set_repo_freeze error for %s: %s", repo, e)
|
|
return False
|
|
|
|
|
|
def clear_repo_freeze(repo: str) -> int:
|
|
"""Clear ALL active freeze rows for repo (operator unfreeze, D4).
|
|
|
|
Sets ``cleared_at=now`` on every row with ``cleared_at IS NULL``. Idempotent
|
|
(a repeat clears 0 rows). Returns the number of rows cleared. never-raise -> 0
|
|
on error.
|
|
"""
|
|
if not repo:
|
|
return 0
|
|
try:
|
|
conn = db.get_db()
|
|
try:
|
|
cur = conn.execute(
|
|
"UPDATE repo_freeze SET cleared_at=datetime('now') "
|
|
"WHERE repo=? AND cleared_at IS NULL",
|
|
(repo,),
|
|
)
|
|
conn.commit()
|
|
n = cur.rowcount or 0
|
|
finally:
|
|
conn.close()
|
|
if n:
|
|
logger.warning("serial_gate: repo %s UNFROZEN (%d row(s) cleared)", repo, n)
|
|
return n
|
|
except Exception as e: # noqa: BLE001 - never-raise
|
|
logger.error("clear_repo_freeze error for %s: %s", repo, e)
|
|
return 0
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Hot-claim SQL fragment (fail-OPEN) — ADR-001 D1
|
|
# ---------------------------------------------------------------------------
|
|
def build_claim_clause() -> str:
|
|
"""Build the ``AND NOT (...)`` fragment spliced into ``claim_next_job``.
|
|
|
|
Blocks an analyst-job whose repo (a) has an EARLIER-queued unfinished task or
|
|
(b) is frozen. Only ``jobs.agent='analyst'`` is gated — jobs of an
|
|
already-active task pass freely (else the single active task could never
|
|
advance).
|
|
|
|
Ordering term — ``t2.id < jobs.task_id`` (FIFO, ADR-001 D1 / FR-2): a task is
|
|
blocked only by EARLIER tasks (lower ``tasks.id``) that are not yet done. This
|
|
is the FIFO refinement of the ADR's pseudo-SQL ``t2.id != jobs.task_id``: with
|
|
``!=`` a BATCH of fresh tasks all sitting in ``analysis`` would mutually block
|
|
(each is "another unfinished task" for the others) -> the whole serial queue
|
|
deadlocks, contradicting FR-2 ("строго по одной, FIFO по jobs.id"). ``<`` admits
|
|
exactly the oldest unfinished task and serialises the rest behind it, while
|
|
still never self-blocking a new/rework analyst-job on its OWN row (R-7) and
|
|
keeping AC-1 (a newer task is held by the older active one) intact.
|
|
|
|
Repo scope: empty CSV -> no repo filter (all repos); non-empty CSV -> ``AND
|
|
jobs.repo IN ('a','b')`` with sanitised tokens (R-6).
|
|
|
|
fail-OPEN (AC-8): kill-switch off OR any build error -> ``""`` (claim behaves
|
|
exactly as before ORCH-088). The trailing space keeps the spliced SQL valid.
|
|
"""
|
|
try:
|
|
if not getattr(settings, "serial_gate_enabled", False):
|
|
return ""
|
|
scope = _scope_repos()
|
|
if scope:
|
|
# All tokens already passed the _REPO_TOKEN regex -> safe to embed.
|
|
repo_in = ", ".join(f"'{t}'" for t in sorted(scope))
|
|
repo_scope = f"AND jobs.repo IN ({repo_in}) "
|
|
else:
|
|
repo_scope = ""
|
|
# ORCH-090 (adr-0026): {done,cancelled} are both terminal — an EARLIER
|
|
# cancelled task no longer holds the FIFO serial gate closed.
|
|
# ORCH-124 (adr-0051 D4.1): under the pause layer an EARLIER PARKED task
|
|
# (paused_at NOT NULL) also no longer holds the FIFO gate — an urgent
|
|
# successor may overtake it. The pause-term is appended INSIDE the existing
|
|
# EXISTS subquery (no extra JOIN/EXISTS), reads only the local DB (offline
|
|
# hot path, NFR-2), and is built inside the same try/except so any error in
|
|
# the pause sub-expression still fails-OPEN (D9). pause off / kill-switch ->
|
|
# pause_term is "" -> the clause is byte-for-byte ORCH-088/090.
|
|
pause_term = " AND t2.paused_at IS NULL" if _pause_layer_enabled() else ""
|
|
active_clause = (
|
|
"EXISTS (SELECT 1 FROM tasks t2 "
|
|
"WHERE t2.repo = jobs.repo AND t2.id < jobs.task_id "
|
|
f"AND t2.stage NOT IN ('done','cancelled'){pause_term}) "
|
|
)
|
|
if _freeze_layer_enabled():
|
|
freeze_clause = (
|
|
"OR EXISTS (SELECT 1 FROM repo_freeze f "
|
|
"WHERE f.repo = jobs.repo AND f.cleared_at IS NULL) "
|
|
)
|
|
else:
|
|
freeze_clause = ""
|
|
return (
|
|
"AND NOT ( jobs.agent = 'analyst' "
|
|
f"{repo_scope}"
|
|
f"AND ( {active_clause}{freeze_clause}) "
|
|
") "
|
|
)
|
|
except Exception as e: # noqa: BLE001 - fail-OPEN: never wedge the queue
|
|
logger.warning("build_claim_clause error -> fail-OPEN (no gate): %s", e)
|
|
return ""
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Observability snapshot for GET /queue (D9 / AC-10)
|
|
# ---------------------------------------------------------------------------
|
|
def _known_repos() -> list[str]:
|
|
"""Registered repo names (best-effort) plus any repo with live gate state."""
|
|
repos: set[str] = set()
|
|
try:
|
|
from . import projects
|
|
for p in projects.PROJECTS:
|
|
if getattr(p, "repo", None):
|
|
repos.add(p.repo)
|
|
except Exception: # noqa: BLE001
|
|
pass
|
|
# Also surface repos that have an active freeze or a queued analyst-job even if
|
|
# they are not in the static registry (defensive — never hide a frozen repo).
|
|
try:
|
|
conn = db.get_db()
|
|
try:
|
|
for (r,) in conn.execute(
|
|
"SELECT DISTINCT repo FROM repo_freeze WHERE cleared_at IS NULL"
|
|
).fetchall():
|
|
if r:
|
|
repos.add(r)
|
|
for (r,) in conn.execute(
|
|
"SELECT DISTINCT repo FROM jobs WHERE status='queued' AND agent='analyst'"
|
|
).fetchall():
|
|
if r:
|
|
repos.add(r)
|
|
finally:
|
|
conn.close()
|
|
except Exception: # noqa: BLE001
|
|
pass
|
|
return sorted(repos)
|
|
|
|
|
|
def _waiting_reason(conn, repo: str, task_id: int | None, *,
|
|
frozen: bool, pause_on: bool, deps_on: bool) -> str | None:
|
|
"""ORCH-124 (adr-0051 D5): why an analyst-job is NOT claimable, or None.
|
|
|
|
Priority order (matches the precedence of the actual claim gates):
|
|
``freeze`` (active repo_freeze) -> ``dependency`` (an unfinished declared
|
|
job_deps predecessor, only when task_deps is on) -> ``active-task`` (an EARLIER
|
|
NON-paused unfinished task holds the FIFO gate) -> ``None`` (claimable). A
|
|
paused predecessor is deliberately NOT a reason — by design it does NOT block,
|
|
so it surfaces only via the snapshot's ``paused`` list, never here. never-raise
|
|
-> None on error (observability only, conservative).
|
|
"""
|
|
try:
|
|
if frozen:
|
|
return "freeze"
|
|
if deps_on and task_id is not None:
|
|
dep = conn.execute(
|
|
"SELECT 1 FROM job_deps d JOIN tasks t ON t.id = d.depends_on_task_id "
|
|
"WHERE d.task_id = ? AND t.stage NOT IN ('done','cancelled') LIMIT 1",
|
|
(task_id,),
|
|
).fetchone()
|
|
if dep is not None:
|
|
return "dependency"
|
|
if task_id is not None:
|
|
pause_term = " AND paused_at IS NULL" if pause_on else ""
|
|
earlier = conn.execute(
|
|
"SELECT 1 FROM tasks WHERE repo=? AND id < ? "
|
|
f"AND stage NOT IN ('done','cancelled'){pause_term} LIMIT 1",
|
|
(repo, task_id),
|
|
).fetchone()
|
|
if earlier is not None:
|
|
return "active-task"
|
|
return None
|
|
except Exception: # noqa: BLE001 - observability only
|
|
return None
|
|
|
|
|
|
def _per_repo_snapshot(repo: str) -> dict:
|
|
"""Per-repo gate state for the /queue snapshot (never raises here)."""
|
|
active_task = None
|
|
waiting: list[dict] = []
|
|
paused: list[dict] = []
|
|
# ORCH-124 (adr-0051 D5): compute frozen up-front so the per-job reason can be
|
|
# derived in the same pass. is_repo_frozen uses its own connection (separate
|
|
# from the snapshot conn below).
|
|
frozen = is_repo_frozen(repo)
|
|
pause_on = _pause_layer_enabled()
|
|
try:
|
|
deps_on = bool(getattr(settings, "task_deps_enabled", False))
|
|
except Exception: # noqa: BLE001
|
|
deps_on = False
|
|
try:
|
|
conn = db.get_db()
|
|
try:
|
|
# ORCH-090 (adr-0026): terminal set {done,cancelled}.
|
|
# ORCH-124 (adr-0051 D4.3): a PARKED task is excluded from active_task
|
|
# (same predicate as build_claim_clause / repo_has_active_task — no
|
|
# drift, TR-7); it surfaces in the additive `paused` list instead.
|
|
pause_term = " AND paused_at IS NULL" if pause_on else ""
|
|
row = conn.execute(
|
|
"SELECT work_item_id, stage FROM tasks "
|
|
f"WHERE repo=? AND stage NOT IN ('done','cancelled'){pause_term} "
|
|
"ORDER BY id LIMIT 1",
|
|
(repo,),
|
|
).fetchone()
|
|
if row:
|
|
active_task = {"work_item_id": row["work_item_id"], "stage": row["stage"]}
|
|
# ORCH-124: additive `paused` list — non-terminal parked tasks of the
|
|
# repo (visible, but NOT counted as active_task). Only meaningful while
|
|
# the pause layer is on.
|
|
if pause_on:
|
|
for pr in conn.execute(
|
|
"SELECT work_item_id, stage, paused_at FROM tasks "
|
|
"WHERE repo=? AND stage NOT IN ('done','cancelled') "
|
|
"AND paused_at IS NOT NULL ORDER BY id",
|
|
(repo,),
|
|
).fetchall():
|
|
paused.append({
|
|
"work_item_id": pr["work_item_id"],
|
|
"stage": pr["stage"],
|
|
"paused_at": pr["paused_at"],
|
|
})
|
|
for j in conn.execute(
|
|
"SELECT j.id AS job_id, j.task_id AS task_id, "
|
|
"t.work_item_id AS work_item_id, t.stage AS stage "
|
|
"FROM jobs j LEFT JOIN tasks t ON t.id = j.task_id "
|
|
"WHERE j.repo=? AND j.status='queued' AND j.agent='analyst' "
|
|
"ORDER BY j.id",
|
|
(repo,),
|
|
).fetchall():
|
|
waiting.append({
|
|
"job_id": j["job_id"],
|
|
"work_item_id": j["work_item_id"],
|
|
"stage": j["stage"],
|
|
# ORCH-124 (D5): why this job is held (freeze/dependency/
|
|
# active-task) or None when claimable.
|
|
"reason": _waiting_reason(
|
|
conn, repo, j["task_id"],
|
|
frozen=frozen, pause_on=pause_on, deps_on=deps_on,
|
|
),
|
|
})
|
|
finally:
|
|
conn.close()
|
|
except Exception as e: # noqa: BLE001
|
|
logger.warning("serial_gate per-repo snapshot error for %s: %s", repo, e)
|
|
frozen_reason = None
|
|
frozen_at = None
|
|
if frozen:
|
|
try:
|
|
fr = _active_freeze_row(repo)
|
|
if fr:
|
|
frozen_reason = fr.get("reason")
|
|
frozen_at = fr.get("frozen_at")
|
|
except Exception: # noqa: BLE001
|
|
pass
|
|
return {
|
|
"active_task": active_task,
|
|
"waiting": waiting,
|
|
# ORCH-124 (D5): additive — parked predecessors (not shown as active_task).
|
|
"paused": paused,
|
|
"frozen": frozen,
|
|
"frozen_reason": frozen_reason,
|
|
"frozen_at": frozen_at,
|
|
}
|
|
|
|
|
|
def snapshot() -> dict:
|
|
"""Read-only serial-gate summary for GET /queue (D9 / AC-10).
|
|
|
|
Additive block; existing /queue keys are untouched. never-raise: any error ->
|
|
a minimal dict with the flags and empty per-repo data.
|
|
"""
|
|
try:
|
|
enabled = bool(getattr(settings, "serial_gate_enabled", False))
|
|
except Exception: # noqa: BLE001
|
|
enabled = False
|
|
try:
|
|
repos_cfg = getattr(settings, "serial_gate_repos", "") or ""
|
|
except Exception: # noqa: BLE001
|
|
repos_cfg = ""
|
|
try:
|
|
per_repo = {r: _per_repo_snapshot(r) for r in _known_repos()}
|
|
return {
|
|
"enabled": enabled,
|
|
"freeze_enabled": _freeze_layer_enabled(),
|
|
"repos": repos_cfg,
|
|
"per_repo": per_repo,
|
|
}
|
|
except Exception as e: # noqa: BLE001 - never-raise -> minimal dict
|
|
logger.warning("serial_gate snapshot error: %s", e)
|
|
return {
|
|
"enabled": enabled,
|
|
"freeze_enabled": False,
|
|
"repos": repos_cfg,
|
|
"per_repo": {},
|
|
}
|