fix(serial-gate): pause-without-blocking via per-task park signal (ORCH-124)
All checks were successful
CI / test (push) Successful in 1m12s
CI / test (pull_request) Successful in 1m17s

Fixes incident ORCH-116/ORCH-123: serial_gate defined a repo's "active task"
purely by machine stage (tasks.stage NOT IN ('done','cancelled')). Plane statuses
Backlog/Blocked/Needs-Input (layer-B indication, ORCH-066) do NOT change
tasks.stage (layer A), so a paused predecessor was indistinguishable from an active
one and held the FIFO gate closed against an urgent successor — the urgent fix
could not start until the paused task was formally done.

Introduces an explicit, durable, DB-resolvable per-task "park" signal — additive
nullable column tasks.paused_at (pattern of cancelled_at/track) — and a new
ORTHOGONAL scheduler "pause" axis. The serial-gate "active task" predicate becomes
`stage NOT IN ('done','cancelled') AND paused_at IS NULL` across all three points
(build_claim_clause / repo_has_active_task / _per_repo_snapshot). The terminal set
{done,cancelled} in serial_gate/task_deps/stages.py is byte-for-byte unchanged
(adr-0026 not regressed): task_deps/stages.py do NOT read paused_at, so a paused
declared dependency and an active repo_freeze STILL block (pause never bypasses
them — different axes). Anti-stale-base on resume relies on the existing deferred
branch cut (ORCH-088) + pre-merge auto_rebase_onto_main + merge-gate re-test
(ORCH-026/093/110) — no new rebase machinery.

Additive, under an independent sub-flag, never-raise, restart-safe; hot-claim
fail-OPEN and freeze fail-CLOSED preserved. STAGE_TRANSITIONS / QG_CHECKS / check_*
/ machine-verdict keys / existing table schemas are byte-for-byte untouched (this is
a queue-scheduler + observability change, not a Quality Gate).

- src/db.py: additive tasks.paused_at column (_ensure_column) + set/clear/is helpers
- src/serial_gate.py: _pause_layer_enabled() + pause-term in the 3 points; `paused`
  list + per-job `reason` (freeze>dependency>active-task>null) in the /queue snapshot
- src/config.py + .env.example: serial_gate_pause_enabled (default True = true no-op)
- src/main.py: POST /serial-gate/pause|resume?work_item=<id> (by образцу unfreeze)
- tests/test_orch124_serial_gate_pause.py: TC-01 mandatory incident regress + TC-02..15
- CHANGELOG.md: [Unreleased] entry

ADR: docs/work-items/ORCH-124/06-adr/ADR-001-serial-gate-pause-without-blocking.md
Cross-cutting: docs/architecture/adr/adr-0051-serial-gate-pause-without-blocking.md

Refs: ORCH-124

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
2026-06-16 19:35:55 +03:00
parent de4f067655
commit 87af857082
8 changed files with 683 additions and 8 deletions

View File

@@ -1001,9 +1001,22 @@ class Settings(BaseSettings):
# layer (env ORCH_SERIAL_GATE_FREEZE_ENABLED). False
# -> freeze is neither set (post-deploy DEGRADED) nor
# consulted in the claim gate.
# serial_gate_pause_enabled -> ORCH-124 (adr-0051 D6): independent tumbler for
# the per-task "park" axis (env
# ORCH_SERIAL_GATE_PAUSE_ENABLED). True (default) ->
# a task with tasks.paused_at NOT NULL is excluded
# from the serial-gate "active task" predicate so an
# URGENT successor may overtake a paused predecessor.
# Default is a TRUE no-op until an operator pauses a
# task (paused_at is NULL for all rows). False ->
# pause-term omitted, serial-gate is byte-for-byte
# ORCH-088/090 (deliberate rollback). Scope reuses
# serial_gate_repos (no new *_repos flag); subordinate
# to the serial_gate_enabled kill-switch.
serial_gate_enabled: bool = True
serial_gate_repos: str = ""
serial_gate_freeze_enabled: bool = True
serial_gate_pause_enabled: bool = True
# ORCH-090: STOP-status task cancellation (stop active agent + full progress
# reset) and the relaunch-hole close. A new logical Plane key `stop` (fail-closed,

100
src/db.py
View File

@@ -147,6 +147,17 @@ def init_db():
# after a successful atomic create). Read in advance_stage for the routing-override
# (skips architecture) — from the DB, NEVER from the network (NFR-4).
_ensure_column(conn, "tasks", "track", "TEXT DEFAULT 'full'")
# ORCH-124 (08-data-requirements.md, ADR-001 D2): per-task durable "park"
# signal for the serial gate. Additive, idempotent (_ensure_column is a no-op
# once present) -> safe on the live shared prod DB (enduro untouched), exactly
# like tasks.cancelled_at / tasks.cancel_requested_at / tasks.track above.
# paused_at -> NULL = not paused; ISO timestamp (datetime('now')) = an
# operator explicitly parked the task (POST /serial-gate/pause).
# Read ONLY by the serial-gate "active task" predicate (ORTHOGONAL to the
# {done,cancelled} terminal axis — task_deps/stages.py do NOT read it, adr-0026
# is untouched). All existing rows default to NULL -> pre-ORCH-124 behaviour
# holds until the first explicit operator pause.
_ensure_column(conn, "tasks", "paused_at", "TEXT")
# ORCH-026 (Level B): declarative task dependencies. job_deps stores the
# directed edge "task_id (B) is blocked-by depends_on_task_id (A)". The
# scheduler gate in claim_next_job keeps B queued until every A reaches
@@ -776,6 +787,95 @@ def get_task_track(task_id: int) -> str:
return "full"
# ---------------------------------------------------------------------------
# ORCH-124: serial-gate per-task park signal (tasks.paused_at) helpers
# ---------------------------------------------------------------------------
def set_task_paused(task_id: int) -> bool:
"""ORCH-124 (ADR-001 D7): park a task for the serial gate (idempotent).
Stamps ``tasks.paused_at=datetime('now')`` so the serial-gate "active task"
predicate stops counting this task as a FIFO blocker (an URGENT successor may
overtake it). Durable (survives restart) and DB-resolvable — the hot-claim SQL
reads it locally without any network call. Re-pausing an already-paused task
keeps the original timestamp (``WHERE paused_at IS NULL``), so the park moment
is stable. never-raise -> False on error (a write failure must not crash the
operator endpoint / worker).
"""
if task_id is None:
return False
try:
conn = get_db()
try:
conn.execute(
"UPDATE tasks SET paused_at=datetime('now') "
"WHERE id=? AND paused_at IS NULL",
(task_id,),
)
conn.commit()
finally:
conn.close()
return True
except Exception as e: # noqa: BLE001 - never-raise
import logging
logging.getLogger("orchestrator.db").warning(
"set_task_paused error for task %s: %s", task_id, e
)
return False
def clear_task_paused(task_id: int) -> bool:
"""ORCH-124 (ADR-001 D7): resume a parked task (idempotent).
Clears ``tasks.paused_at`` back to NULL so the task re-enters the serial-gate
FIFO (holds the gate as active again, or re-enters with a deferred branch cut —
see ADR-001 D8). Resuming a task that is not paused is a no-op. never-raise ->
False on error.
"""
if task_id is None:
return False
try:
conn = get_db()
try:
conn.execute(
"UPDATE tasks SET paused_at=NULL WHERE id=?",
(task_id,),
)
conn.commit()
finally:
conn.close()
return True
except Exception as e: # noqa: BLE001 - never-raise
import logging
logging.getLogger("orchestrator.db").warning(
"clear_task_paused error for task %s: %s", task_id, e
)
return False
def is_task_paused(task_id: int) -> bool:
"""ORCH-124: read whether a task is currently parked; missing/error -> False.
Conservative fail direction (ADR-001 D9): on any read error we report "not
paused" so the task is treated as active -> the serial gate stays CLOSED rather
than wrongly opening (anti-stale-base safe). Mirror of ``get_task_track``.
"""
if task_id is None:
return False
try:
conn = get_db()
try:
row = conn.execute(
"SELECT paused_at FROM tasks WHERE id=?", (task_id,)
).fetchone()
finally:
conn.close()
if not row:
return False
return row["paused_at"] is not None
except Exception: # noqa: BLE001 - conservative: not paused -> stays active
return False
# ---------------------------------------------------------------------------
# Telegram live tracker helpers (feat/telegram-live-tracker)
# ---------------------------------------------------------------------------

View File

@@ -376,6 +376,84 @@ async def serial_gate_unfreeze(repo: str = ""):
return {"ok": True, "repo": repo, "cleared": cleared, "frozen": frozen}
@app.post("/serial-gate/pause")
async def serial_gate_pause(work_item: str = ""):
"""ORCH-124 (adr-0051 D7): park a task so the serial gate stops counting it as
an active FIFO blocker — an urgent successor may overtake it.
Explicit, durable, DB-resolvable operator intent (NOT a Plane-status gesture):
stamps ``tasks.paused_at`` so the offline hot-claim SQL reads it locally without
a network call. Pause does NOT bypass a ``repo_freeze`` or a declared dependency
(different axes) and is NOT terminal (distinct from STOP/cancel). By образцу
``POST /serial-gate/unfreeze``; never-raise. Pausing a terminal (done/cancelled)
task is a no-op. When the pause sub-flag is off the call is a no-op + warning
(the pause-term is omitted from the gate, so a column write would be latent).
"""
from . import db
from . import serial_gate
if not work_item or not work_item.strip():
return {"ok": False, "error": "missing 'work_item'", "work_item": work_item}
work_item = work_item.strip()
if not serial_gate._pause_layer_enabled():
return {"ok": False, "error": "serial_gate_pause_enabled is off (no-op)",
"work_item": work_item}
task = db.get_task_by_work_item_id(work_item)
if not task:
return {"ok": False, "error": "unknown work_item", "work_item": work_item}
task_id = task["id"]
stage = task.get("stage")
if stage in ("done", "cancelled"):
return {"ok": False, "error": f"task is terminal (stage={stage})",
"work_item": work_item, "task_id": task_id, "stage": stage}
ok = db.set_task_paused(task_id)
refreshed = db.get_task_by_work_item_id(work_item) or {}
paused_at = refreshed.get("paused_at")
if ok:
try:
from .notifications import send_telegram, link_for
send_telegram(
f"⏸️ {link_for(work_item)}: задача поставлена на ПАУЗУ для serial-gate "
f"(task {task_id}, stage={stage}). Срочный успешник репо может обогнать; "
f"resume — POST /serial-gate/resume."
)
except Exception:
pass
return {"ok": ok, "work_item": work_item, "task_id": task_id,
"stage": stage, "paused_at": paused_at}
@app.post("/serial-gate/resume")
async def serial_gate_resume(work_item: str = ""):
"""ORCH-124 (adr-0051 D7 / AC-10): resume a parked task — it re-enters the
serial gate (holds it as active again / re-enters FIFO with the deferred branch
cut, D8). Inverse of ``POST /serial-gate/pause``; idempotent (resuming a task
that is not paused clears nothing). Anti-stale-base on resume is guaranteed by
the EXISTING mechanisms (deferred branch cut + pre-merge auto_rebase_onto_main +
merge-gate re-test, ORCH-088/093/110) — no new rebase machinery. never-raise.
"""
from . import db
if not work_item or not work_item.strip():
return {"ok": False, "error": "missing 'work_item'", "work_item": work_item}
work_item = work_item.strip()
task = db.get_task_by_work_item_id(work_item)
if not task:
return {"ok": False, "error": "unknown work_item", "work_item": work_item}
task_id = task["id"]
was_paused = task.get("paused_at") is not None
ok = db.clear_task_paused(task_id)
if ok and was_paused:
try:
from .notifications import send_telegram, link_for
send_telegram(
f"▶️ {link_for(work_item)}: задача СНЯТА С ПАУЗЫ (task {task_id}) — "
f"снова участвует в serial-gate."
)
except Exception:
pass
return {"ok": ok, "work_item": work_item, "task_id": task_id,
"was_paused": was_paused, "paused_at": None}
@app.post("/transition-lease/release")
async def transition_lease_release(work_item: str = ""):
"""ORCH-114 (adr-0045 / D10): operator manual reclaim of a stuck transition-lease.

View File

@@ -23,6 +23,16 @@ Two deliberately different failure directions (ADR-001 D10, NFR-1):
must not wedge the queue of ALL projects (AC-8).
* freeze decision (``is_repo_frozen``) -> fail-CLOSED (``True``): when we cannot
confirm the ABSENCE of a freeze we keep the gate closed for prod safety (AC-9).
ORCH-124 (adr-0051): adds an ORTHOGONAL "pause" axis to the "active task" predicate
of all three points (``build_claim_clause`` / ``repo_has_active_task`` /
``_per_repo_snapshot``). A task with ``tasks.paused_at`` NOT NULL (an operator
``POST /serial-gate/pause``) is excluded from the FIFO "active" set so an URGENT
successor may overtake a paused predecessor — fixing incident ORCH-116/ORCH-123. The
terminal set ``{done,cancelled}`` (adr-0026) is UNCHANGED; ``task_deps`` / ``stages.py``
do NOT read ``paused_at`` (pause never bypasses a freeze or a declared dependency).
Gated by the independent sub-flag ``serial_gate_pause_enabled`` (default True is a true
no-op until the first explicit pause).
"""
from __future__ import annotations
@@ -97,6 +107,22 @@ def _freeze_layer_enabled() -> bool:
return False
def _pause_layer_enabled() -> bool:
"""ORCH-124 (adr-0051 D6): whether the per-task pause axis is active.
Independent tumbler ``serial_gate_pause_enabled`` (mirror of
``_freeze_layer_enabled``). When True the "active task" predicate of all three
serial-gate points additionally excludes paused tasks (``paused_at IS NULL``);
when False the pause-term is omitted and serial-gate behaves byte-for-byte as
ORCH-088/090. Default True is a true no-op until an operator parks a task
(``paused_at`` is NULL for every row). never-raise -> False (pause inert).
"""
try:
return bool(getattr(settings, "serial_gate_pause_enabled", False))
except Exception: # noqa: BLE001
return False
# ---------------------------------------------------------------------------
# Read helpers (active task + freeze) — only the local DB
# ---------------------------------------------------------------------------
@@ -113,16 +139,21 @@ def repo_has_active_task(repo: str, exclude_task_id: int | None = None) -> bool:
# ORCH-090 (adr-0026): terminal set is {done,cancelled}. A cancelled
# task must NOT count as "active" or it would block the repo's serial
# gate forever.
# ORCH-124 (adr-0051 D4.2): under the pause layer a PARKED task
# (paused_at NOT NULL) is likewise NOT "active" — it must not hold the
# FIFO gate against an urgent successor. Same predicate as the hot SQL
# (D4.1) and the snapshot (D4.3) so the three points never drift (TR-7).
pause_term = " AND paused_at IS NULL" if _pause_layer_enabled() else ""
if exclude_task_id is not None:
row = conn.execute(
"SELECT 1 FROM tasks WHERE repo=? AND id != ? "
"AND stage NOT IN ('done','cancelled') LIMIT 1",
f"AND stage NOT IN ('done','cancelled'){pause_term} LIMIT 1",
(repo, exclude_task_id),
).fetchone()
else:
row = conn.execute(
"SELECT 1 FROM tasks WHERE repo=? "
"AND stage NOT IN ('done','cancelled') LIMIT 1",
f"AND stage NOT IN ('done','cancelled'){pause_term} LIMIT 1",
(repo,),
).fetchone()
return row is not None
@@ -271,10 +302,18 @@ def build_claim_clause() -> str:
repo_scope = ""
# ORCH-090 (adr-0026): {done,cancelled} are both terminal — an EARLIER
# cancelled task no longer holds the FIFO serial gate closed.
# ORCH-124 (adr-0051 D4.1): under the pause layer an EARLIER PARKED task
# (paused_at NOT NULL) also no longer holds the FIFO gate — an urgent
# successor may overtake it. The pause-term is appended INSIDE the existing
# EXISTS subquery (no extra JOIN/EXISTS), reads only the local DB (offline
# hot path, NFR-2), and is built inside the same try/except so any error in
# the pause sub-expression still fails-OPEN (D9). pause off / kill-switch ->
# pause_term is "" -> the clause is byte-for-byte ORCH-088/090.
pause_term = " AND t2.paused_at IS NULL" if _pause_layer_enabled() else ""
active_clause = (
"EXISTS (SELECT 1 FROM tasks t2 "
"WHERE t2.repo = jobs.repo AND t2.id < jobs.task_id "
"AND t2.stage NOT IN ('done','cancelled')) "
f"AND t2.stage NOT IN ('done','cancelled'){pause_term}) "
)
if _freeze_layer_enabled():
freeze_clause = (
@@ -329,23 +368,91 @@ def _known_repos() -> list[str]:
return sorted(repos)
def _waiting_reason(conn, repo: str, task_id: int | None, *,
frozen: bool, pause_on: bool, deps_on: bool) -> str | None:
"""ORCH-124 (adr-0051 D5): why an analyst-job is NOT claimable, or None.
Priority order (matches the precedence of the actual claim gates):
``freeze`` (active repo_freeze) -> ``dependency`` (an unfinished declared
job_deps predecessor, only when task_deps is on) -> ``active-task`` (an EARLIER
NON-paused unfinished task holds the FIFO gate) -> ``None`` (claimable). A
paused predecessor is deliberately NOT a reason — by design it does NOT block,
so it surfaces only via the snapshot's ``paused`` list, never here. never-raise
-> None on error (observability only, conservative).
"""
try:
if frozen:
return "freeze"
if deps_on and task_id is not None:
dep = conn.execute(
"SELECT 1 FROM job_deps d JOIN tasks t ON t.id = d.depends_on_task_id "
"WHERE d.task_id = ? AND t.stage NOT IN ('done','cancelled') LIMIT 1",
(task_id,),
).fetchone()
if dep is not None:
return "dependency"
if task_id is not None:
pause_term = " AND paused_at IS NULL" if pause_on else ""
earlier = conn.execute(
"SELECT 1 FROM tasks WHERE repo=? AND id < ? "
f"AND stage NOT IN ('done','cancelled'){pause_term} LIMIT 1",
(repo, task_id),
).fetchone()
if earlier is not None:
return "active-task"
return None
except Exception: # noqa: BLE001 - observability only
return None
def _per_repo_snapshot(repo: str) -> dict:
"""Per-repo gate state for the /queue snapshot (never raises here)."""
active_task = None
waiting: list[dict] = []
paused: list[dict] = []
# ORCH-124 (adr-0051 D5): compute frozen up-front so the per-job reason can be
# derived in the same pass. is_repo_frozen uses its own connection (separate
# from the snapshot conn below).
frozen = is_repo_frozen(repo)
pause_on = _pause_layer_enabled()
try:
deps_on = bool(getattr(settings, "task_deps_enabled", False))
except Exception: # noqa: BLE001
deps_on = False
try:
conn = db.get_db()
try:
# ORCH-090 (adr-0026): terminal set {done,cancelled}.
# ORCH-124 (adr-0051 D4.3): a PARKED task is excluded from active_task
# (same predicate as build_claim_clause / repo_has_active_task — no
# drift, TR-7); it surfaces in the additive `paused` list instead.
pause_term = " AND paused_at IS NULL" if pause_on else ""
row = conn.execute(
"SELECT work_item_id, stage FROM tasks "
"WHERE repo=? AND stage NOT IN ('done','cancelled') ORDER BY id LIMIT 1",
f"WHERE repo=? AND stage NOT IN ('done','cancelled'){pause_term} "
"ORDER BY id LIMIT 1",
(repo,),
).fetchone()
if row:
active_task = {"work_item_id": row["work_item_id"], "stage": row["stage"]}
# ORCH-124: additive `paused` list — non-terminal parked tasks of the
# repo (visible, but NOT counted as active_task). Only meaningful while
# the pause layer is on.
if pause_on:
for pr in conn.execute(
"SELECT work_item_id, stage, paused_at FROM tasks "
"WHERE repo=? AND stage NOT IN ('done','cancelled') "
"AND paused_at IS NOT NULL ORDER BY id",
(repo,),
).fetchall():
paused.append({
"work_item_id": pr["work_item_id"],
"stage": pr["stage"],
"paused_at": pr["paused_at"],
})
for j in conn.execute(
"SELECT j.id AS job_id, t.work_item_id AS work_item_id, t.stage AS stage "
"SELECT j.id AS job_id, j.task_id AS task_id, "
"t.work_item_id AS work_item_id, t.stage AS stage "
"FROM jobs j LEFT JOIN tasks t ON t.id = j.task_id "
"WHERE j.repo=? AND j.status='queued' AND j.agent='analyst' "
"ORDER BY j.id",
@@ -355,12 +462,17 @@ def _per_repo_snapshot(repo: str) -> dict:
"job_id": j["job_id"],
"work_item_id": j["work_item_id"],
"stage": j["stage"],
# ORCH-124 (D5): why this job is held (freeze/dependency/
# active-task) or None when claimable.
"reason": _waiting_reason(
conn, repo, j["task_id"],
frozen=frozen, pause_on=pause_on, deps_on=deps_on,
),
})
finally:
conn.close()
except Exception as e: # noqa: BLE001
logger.warning("serial_gate per-repo snapshot error for %s: %s", repo, e)
frozen = is_repo_frozen(repo)
frozen_reason = None
frozen_at = None
if frozen:
@@ -374,6 +486,8 @@ def _per_repo_snapshot(repo: str) -> dict:
return {
"active_task": active_task,
"waiting": waiting,
# ORCH-124 (D5): additive — parked predecessors (not shown as active_task).
"paused": paused,
"frozen": frozen,
"frozen_reason": frozen_reason,
"frozen_at": frozen_at,