From 87af857082a2a69c163cc8f87d4822d3c7bd7776 Mon Sep 17 00:00:00 2001 From: claude-bot Date: Tue, 16 Jun 2026 19:35:55 +0300 Subject: [PATCH] fix(serial-gate): pause-without-blocking via per-task park signal (ORCH-124) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Fixes incident ORCH-116/ORCH-123: serial_gate defined a repo's "active task" purely by machine stage (tasks.stage NOT IN ('done','cancelled')). Plane statuses Backlog/Blocked/Needs-Input (layer-B indication, ORCH-066) do NOT change tasks.stage (layer A), so a paused predecessor was indistinguishable from an active one and held the FIFO gate closed against an urgent successor — the urgent fix could not start until the paused task was formally done. Introduces an explicit, durable, DB-resolvable per-task "park" signal — additive nullable column tasks.paused_at (pattern of cancelled_at/track) — and a new ORTHOGONAL scheduler "pause" axis. The serial-gate "active task" predicate becomes `stage NOT IN ('done','cancelled') AND paused_at IS NULL` across all three points (build_claim_clause / repo_has_active_task / _per_repo_snapshot). The terminal set {done,cancelled} in serial_gate/task_deps/stages.py is byte-for-byte unchanged (adr-0026 not regressed): task_deps/stages.py do NOT read paused_at, so a paused declared dependency and an active repo_freeze STILL block (pause never bypasses them — different axes). Anti-stale-base on resume relies on the existing deferred branch cut (ORCH-088) + pre-merge auto_rebase_onto_main + merge-gate re-test (ORCH-026/093/110) — no new rebase machinery. Additive, under an independent sub-flag, never-raise, restart-safe; hot-claim fail-OPEN and freeze fail-CLOSED preserved. STAGE_TRANSITIONS / QG_CHECKS / check_* / machine-verdict keys / existing table schemas are byte-for-byte untouched (this is a queue-scheduler + observability change, not a Quality Gate). - src/db.py: additive tasks.paused_at column (_ensure_column) + set/clear/is helpers - src/serial_gate.py: _pause_layer_enabled() + pause-term in the 3 points; `paused` list + per-job `reason` (freeze>dependency>active-task>null) in the /queue snapshot - src/config.py + .env.example: serial_gate_pause_enabled (default True = true no-op) - src/main.py: POST /serial-gate/pause|resume?work_item= (by образцу unfreeze) - tests/test_orch124_serial_gate_pause.py: TC-01 mandatory incident regress + TC-02..15 - CHANGELOG.md: [Unreleased] entry ADR: docs/work-items/ORCH-124/06-adr/ADR-001-serial-gate-pause-without-blocking.md Cross-cutting: docs/architecture/adr/adr-0051-serial-gate-pause-without-blocking.md Refs: ORCH-124 Co-Authored-By: Claude Opus 4.8 --- .env.example | 6 + .task-dev.md | 4 +- CHANGELOG.md | 11 + src/config.py | 13 + src/db.py | 100 +++++++ src/main.py | 78 ++++++ src/serial_gate.py | 126 ++++++++- tests/test_orch124_serial_gate_pause.py | 353 ++++++++++++++++++++++++ 8 files changed, 683 insertions(+), 8 deletions(-) create mode 100644 tests/test_orch124_serial_gate_pause.py diff --git a/.env.example b/.env.example index ebe44ca..9e560f2 100644 --- a/.env.example +++ b/.env.example @@ -230,9 +230,15 @@ ORCH_TASK_DEPS_SOURCE=db # SERIAL_GATE_ENABLED=false -> claim AND start_pipeline are 1:1 as before ORCH-088. # SERIAL_GATE_REPOS (CSV) -> scope; EMPTY = ALL repos (not self-hosting-only). # SERIAL_GATE_FREEZE_ENABLED=false -> the rollback-freeze layer is off (not set/read). +# SERIAL_GATE_PAUSE_ENABLED (ORCH-124) -> per-task "park" axis. true (default) -> a +# task with tasks.paused_at NOT NULL (POST /serial-gate/pause?work_item=) is +# excluded from the "active task" predicate so an URGENT successor may overtake a +# paused predecessor. TRUE no-op until an operator pauses a task. false -> pause-term +# omitted, serial-gate byte-for-byte ORCH-088/090. Scope reuses SERIAL_GATE_REPOS. ORCH_SERIAL_GATE_ENABLED=true ORCH_SERIAL_GATE_REPOS= ORCH_SERIAL_GATE_FREEZE_ENABLED=true +ORCH_SERIAL_GATE_PAUSE_ENABLED=true # ORCH-090: STOP-status task cancellation (stop active agent + full progress reset) # and the relaunch-hole close. A dedicated Plane "STOP" status (logical key `stop`, # fail-closed: absent from _DEFAULT_STATES, so a board without the status -> no-op) diff --git a/.task-dev.md b/.task-dev.md index 69f6747..327ea1c 100644 --- a/.task-dev.md +++ b/.task-dev.md @@ -1,4 +1,4 @@ -Work item: ORCH-116 +Work item: ORCH-124 Repo: orchestrator -Branch: feature/ORCH-116-orch-replace-llm-tester-with-d +Branch: feature/ORCH-124-bug-serial-gate-treats-backlog Stage: development \ No newline at end of file diff --git a/CHANGELOG.md b/CHANGELOG.md index 6c74080..792d499 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,6 +3,17 @@ Формат: [Keep a Changelog](https://keepachangelog.com/). Записи — на смысловой PR/задачу. ## [Unreleased] +- **Serial-gate «пауза без блокировки» — явный per-task park-сигнал** (ORCH-124, `fix`): багфикс (метка `Bug`, эскалирован в full-cycle) инцидента **ORCH-116/ORCH-123**. `serial_gate` определял «активную задачу репо» **исключительно по машинной стадии** `tasks.stage NOT IN ('done','cancelled')`, а Plane-статусы Backlog/Blocked/Needs-Input (слой B индикации, ORCH-066) **не меняют `tasks.stage`** (слой A) ⇒ приостановленный предшественник был неотличим от активного и держал FIFO-гейт закрытым против срочного успешника (ORCH-116 поставлен на паузу, чтобы пропустить фикс ORCH-123 — фикс не стартовал, пока ORCH-116 формально не `done`). У оператора не было чистого механизма «пауза без блокировки», отдельного от cancel (терминал) и от глобального выключения гейта. **Инвариант:** правка **планировщика очереди** (claim) и наблюдаемости, **не** Quality Gate — `STAGE_TRANSITIONS` / состав `QG_CHECKS` / семантика и имена `check_*` / machine-verdict ключи (`verdict:`/`result:`/`deploy_status:`/`staging_status:`/`security_status:`) / схемы существующих таблиц — **байт-в-байт не тронуты**. Аддитивно, под независимым под-флагом, never-raise, restart-safe, fail-OPEN на hot-claim сохранён. ADR: `docs/work-items/ORCH-124/06-adr/ADR-001-serial-gate-pause-without-blocking.md`, сквозной `docs/architecture/adr/adr-0051-serial-gate-pause-without-blocking.md`. + - **Механизм (D1):** явный durable DB-сигнал «park» на уровне задачи, инициируемый оператором через API — **не** маппинг Plane-статуса (перегружал бы слой A/B ORCH-066 / анти-паттерн ORCH-059) и **не** `task_deps` (моделирует обратное направление «B ждёт A»). Чистое намерение, отличное от cancel и от kill-switch; DB-резолвимо, offline, webhook-независимо (потерянный webhook не рассинхронит сигнал). + - **Хранилище (D2):** аддитивная нуллабельная колонка `tasks.paused_at TEXT` через `_ensure_column` (паттерн `tasks.cancelled_at`/`cancel_requested_at`/`track`; `src/db.py`) — NULL = не на паузе; ISO-таймстамп = поставлена оператором на паузу. На уже-мигрированной БД — no-op; все существующие строки дефолтят в NULL ⇒ поведение до ORCH-124 до первой явной паузы (enduro не затронут на общей прод-БД). Хелперы `db.set_task_paused`/`clear_task_paused`/`is_task_paused` (never-raise; `is_task_paused` на ошибке → «не на паузе» = задача активна = гейт скорее закрыт = анти-stale-base-safe). + - **Ортогональная ось (D3, критично):** «активность» для serial-gate = `stage NOT IN ('done','cancelled') AND paused_at IS NULL`; **терминал `{done,cancelled}` остаётся байт-в-байт** в `serial_gate`/`task_deps`/`stages.py` (adr-0026 не регрессирует). `task_deps`/`stages.py` колонку `paused_at` **НЕ читают** ⇒ паузнутая объявленная зависимость (`job_deps`) и `repo_freeze` **по-прежнему блокируют** claim (пауза их **не** обходит — разные оси: freeze = весь репо, dependency = конкретная пара, пауза = «пропустите меня в FIFO»). + - **Три точки согласованно (D4, анти-дрейф):** один предикат «активна» под под-флагом — терм `AND t2.paused_at IS NULL` внутри существующего `EXISTS`-подзапроса `build_claim_clause` (горячий offline SQL, без лишнего JOIN), `AND paused_at IS NULL` в `repo_has_active_task` и в выборе `active_task` `_per_repo_snapshot` (`src/serial_gate.py`). Помечено маркером `ORCH-124` рядом с `ORCH-088`/`ORCH-090`. + - **Операторские эндпоинты (D7):** `POST /serial-gate/pause?work_item=` (стамп `paused_at`; терминальная/неизвестная задача → no-op-ответ; под-флаг off → no-op-предупреждение) и `POST /serial-gate/resume?work_item=` (сброс `paused_at` → задача снова участвует в гейте; идемпотентно) — по образцу `POST /serial-gate/unfreeze`, never-raise, с Telegram-подтверждением (`src/main.py`). + - **Анти-stale-base при resume (D8, R-1):** новой rebase-машинерии **нет** — свежесть базы гарантируют существующие механизмы: паузнутая-в-`analysis` задача при resume режет ветку отложенно (ORCH-088) от свежего `origin/main` с кодом успешника; материализованная — ребейзится на merge-gate (`auto_rebase_onto_main` под merge-lease ORCH-026/093) + re-test (ORCH-110). Нормальная задача (`paused_at IS NULL`) по-прежнему держит гейт ⇒ анти-stale-base для нормального случая (ORCH-088) **не регрессирует**. + - **Наблюдаемость (D5):** блок `serial_gate` в `GET /queue` дополнен ключом `paused` (список приостановленных незавершённых задач репо — НЕ показываются как `active_task`) и `reason` ожидания у каждого waiting-job с приоритетом `freeze` → `dependency` → `active-task` → `null`; существующие ключи снапшота (`active_task`/`waiting`/`frozen`/`frozen_reason`/`frozen_at`) — байт-в-байт (BC). + - **Условность/откат (D6):** независимый под-флаг `serial_gate_pause_enabled` (env `ORCH_SERIAL_GATE_PAUSE_ENABLED`, дефолт `True`; зеркало `serial_gate_freeze_enabled`; область переиспользует `serial_gate_repos`, новый `*_repos` не вводится). Дефолт `True` — **истинный no-op** до явной операторской паузы (`paused_at` всюду NULL). `False` ⇒ pause-терм опущен из SQL, эндпоинты no-op, serial-gate **байт-в-байт ORCH-088/090** (осознанный rollback-режим). Глубже — `serial_gate_enabled=false`. + - **Покрытие:** `tests/test_orch124_serial_gate_pause.py` (TC-01 обязательный регресс инцидента ORCH-116/ORCH-123 — красный до фикса, зелёный после; TC-02…TC-15: анти-регресс ORCH-088, durable/restart, resume, сохранность freeze/dependency, снапшот-reason, анти-дрейф 3 точек, offline hot-path, never-raise/fail-OPEN, kill-switch-нейтральность, структурный анти-регресс реестров/схем). + - **Доки:** обновлены `docs/architecture/README.md` (раздел serial-gate + ось «пауза без блокировки») и `docs/architecture/internals.md` (ось «пауза» ⊥ оси «терминальность»); сквозной ADR `adr-0051`. - **Детерминированный test-раннер вместо LLM-тестера на `testing`** (ORCH-116, `feat`): второй реализованный срез determinization-roadmap (ORCH-118 A5, `needs-hybrid-fallback`) — на стадии `testing` для self-hosting `orchestrator` **LLM-агент `tester` заменён детерминированным кодом** (`src/test_runner.py`). PASS/FAIL-ядро агента было деривируемым (регресс `pytest` + read-only smoke → `result:`); каждый прогон жёг токены/время opus-агента (~60–150k / 5–20 мин) и встраивал недетерминизм LLM в точку ветвления `testing → deploy-staging` / `testing → development`. **Инвариант (NFR-1):** это замена *продюсера* артефакта, **не** гейта — контракт `13-test-report.md`, гейт `check_tests_passed`/`_parse_tests_verdict`, `STAGE_TRANSITIONS`, machine-verdict `result:` (+ legacy `verdict:`/`status:`), схема БД — **байт-в-байт не тронуты**. Аддитивно, под kill-switch, never-raise, fail-closed, скоуп self-hosting, гибрид (LLM строго off-control-path). Эталон — `src/staging_runner.py` (ORCH-115). ADR: `docs/work-items/ORCH-116/06-adr/ADR-001-deterministic-test-runner.md`, сквозной `docs/architecture/adr/adr-0050-deterministic-test-runner.md`. - **Перехват в `launch_job` до `_spawn` (D1):** `if job.agent=="tester" and test_runner.should_intercept(job)` → `_run_test_runner_job` (зеркало `_run_staging_runner_job`, прецедент `deploy-finalizer`/`post-deploy-monitor`/`staging-runner` `launcher.py:397/402/405`): синхронно ведёт `jobs`-строку через `mark_job`, возвращает `None` (нет `agent_runs`, нет токенов). Дискриминатор — роль `tester` **И** стадия задачи `testing` (defense-in-depth: `tester` — единственный агент входа в `testing`, коллизии стадий нет, в отличие от общей роли `deployer`) **И** `applies(repo)`; `should_intercept` never-raise → `False` → штатный `_spawn` (fail-safe к LLM-пути). - **Leaf `src/test_runner.py` (новый, чистый never-raise):** по образцу `staging_runner`/`self_deploy`/`proc_group` (на импорте только `config`/`proc_group`; `db`/`git_worktree`/`self_deploy`/`qg.checks`/`stage_engine`/`notifications` — лениво). `applies(repo)` = kill-switch `test_runner_enabled` + скоуп `test_runner_repos` (пусто → self-hosting only) **И** резолв тест-контракта `_has_test_contract` (BR-9: репо без контракта → `False` → LLM-tester — enduro-trails 1:1 как до ORCH-116, даже если руками добавлен в CSV). Исполняет регресс `python -m pytest ` **в worktree ветки** (`git_worktree.get_worktree_path`, анти checkout-гонка ORCH-112) через `proc_group.run_in_process_group` (tree-kill, таймаут `test_runner_timeout_s=900`, малформ/непозитив → дефолт + WARNING) + опц. **read-only smoke** (`/health`/`/status`/`/queue` + блок `serial_gate`, stdlib `urllib`; транзиентная недостижимость — ограниченный ретрай, не-200/нет блока — немедленный FAIL; `test_runner_smoke_enabled`). Маппит exit-код **единым** контрактом `self_deploy.map_exit_code_to_status` в токенах `result:` (`0→PASS`/иначе/None→`FAIL`, fail-closed; smoke-провал AND-ится в `FAIL`); пишет `13-test-report.md` (тот же machine-key `result:` UPPERCASE + 52c-схема, `author_agent: test-runner`/`model_used: n/a`) + best-effort push в **фичеветку**; вызывает **существующий** `advance_stage(current_stage="testing", finished_agent="tester")` — без новых рёбер/исходов (transition-lease ORCH-114 берётся внутри `advance_stage` — граница O1). diff --git a/src/config.py b/src/config.py index d379ce1..2d637d4 100644 --- a/src/config.py +++ b/src/config.py @@ -1001,9 +1001,22 @@ class Settings(BaseSettings): # layer (env ORCH_SERIAL_GATE_FREEZE_ENABLED). False # -> freeze is neither set (post-deploy DEGRADED) nor # consulted in the claim gate. + # serial_gate_pause_enabled -> ORCH-124 (adr-0051 D6): independent tumbler for + # the per-task "park" axis (env + # ORCH_SERIAL_GATE_PAUSE_ENABLED). True (default) -> + # a task with tasks.paused_at NOT NULL is excluded + # from the serial-gate "active task" predicate so an + # URGENT successor may overtake a paused predecessor. + # Default is a TRUE no-op until an operator pauses a + # task (paused_at is NULL for all rows). False -> + # pause-term omitted, serial-gate is byte-for-byte + # ORCH-088/090 (deliberate rollback). Scope reuses + # serial_gate_repos (no new *_repos flag); subordinate + # to the serial_gate_enabled kill-switch. serial_gate_enabled: bool = True serial_gate_repos: str = "" serial_gate_freeze_enabled: bool = True + serial_gate_pause_enabled: bool = True # ORCH-090: STOP-status task cancellation (stop active agent + full progress # reset) and the relaunch-hole close. A new logical Plane key `stop` (fail-closed, diff --git a/src/db.py b/src/db.py index 8bf1c1f..8089400 100644 --- a/src/db.py +++ b/src/db.py @@ -147,6 +147,17 @@ def init_db(): # after a successful atomic create). Read in advance_stage for the routing-override # (skips architecture) — from the DB, NEVER from the network (NFR-4). _ensure_column(conn, "tasks", "track", "TEXT DEFAULT 'full'") + # ORCH-124 (08-data-requirements.md, ADR-001 D2): per-task durable "park" + # signal for the serial gate. Additive, idempotent (_ensure_column is a no-op + # once present) -> safe on the live shared prod DB (enduro untouched), exactly + # like tasks.cancelled_at / tasks.cancel_requested_at / tasks.track above. + # paused_at -> NULL = not paused; ISO timestamp (datetime('now')) = an + # operator explicitly parked the task (POST /serial-gate/pause). + # Read ONLY by the serial-gate "active task" predicate (ORTHOGONAL to the + # {done,cancelled} terminal axis — task_deps/stages.py do NOT read it, adr-0026 + # is untouched). All existing rows default to NULL -> pre-ORCH-124 behaviour + # holds until the first explicit operator pause. + _ensure_column(conn, "tasks", "paused_at", "TEXT") # ORCH-026 (Level B): declarative task dependencies. job_deps stores the # directed edge "task_id (B) is blocked-by depends_on_task_id (A)". The # scheduler gate in claim_next_job keeps B queued until every A reaches @@ -776,6 +787,95 @@ def get_task_track(task_id: int) -> str: return "full" +# --------------------------------------------------------------------------- +# ORCH-124: serial-gate per-task park signal (tasks.paused_at) helpers +# --------------------------------------------------------------------------- +def set_task_paused(task_id: int) -> bool: + """ORCH-124 (ADR-001 D7): park a task for the serial gate (idempotent). + + Stamps ``tasks.paused_at=datetime('now')`` so the serial-gate "active task" + predicate stops counting this task as a FIFO blocker (an URGENT successor may + overtake it). Durable (survives restart) and DB-resolvable — the hot-claim SQL + reads it locally without any network call. Re-pausing an already-paused task + keeps the original timestamp (``WHERE paused_at IS NULL``), so the park moment + is stable. never-raise -> False on error (a write failure must not crash the + operator endpoint / worker). + """ + if task_id is None: + return False + try: + conn = get_db() + try: + conn.execute( + "UPDATE tasks SET paused_at=datetime('now') " + "WHERE id=? AND paused_at IS NULL", + (task_id,), + ) + conn.commit() + finally: + conn.close() + return True + except Exception as e: # noqa: BLE001 - never-raise + import logging + logging.getLogger("orchestrator.db").warning( + "set_task_paused error for task %s: %s", task_id, e + ) + return False + + +def clear_task_paused(task_id: int) -> bool: + """ORCH-124 (ADR-001 D7): resume a parked task (idempotent). + + Clears ``tasks.paused_at`` back to NULL so the task re-enters the serial-gate + FIFO (holds the gate as active again, or re-enters with a deferred branch cut — + see ADR-001 D8). Resuming a task that is not paused is a no-op. never-raise -> + False on error. + """ + if task_id is None: + return False + try: + conn = get_db() + try: + conn.execute( + "UPDATE tasks SET paused_at=NULL WHERE id=?", + (task_id,), + ) + conn.commit() + finally: + conn.close() + return True + except Exception as e: # noqa: BLE001 - never-raise + import logging + logging.getLogger("orchestrator.db").warning( + "clear_task_paused error for task %s: %s", task_id, e + ) + return False + + +def is_task_paused(task_id: int) -> bool: + """ORCH-124: read whether a task is currently parked; missing/error -> False. + + Conservative fail direction (ADR-001 D9): on any read error we report "not + paused" so the task is treated as active -> the serial gate stays CLOSED rather + than wrongly opening (anti-stale-base safe). Mirror of ``get_task_track``. + """ + if task_id is None: + return False + try: + conn = get_db() + try: + row = conn.execute( + "SELECT paused_at FROM tasks WHERE id=?", (task_id,) + ).fetchone() + finally: + conn.close() + if not row: + return False + return row["paused_at"] is not None + except Exception: # noqa: BLE001 - conservative: not paused -> stays active + return False + + # --------------------------------------------------------------------------- # Telegram live tracker helpers (feat/telegram-live-tracker) # --------------------------------------------------------------------------- diff --git a/src/main.py b/src/main.py index 874c29c..5b0fb11 100644 --- a/src/main.py +++ b/src/main.py @@ -376,6 +376,84 @@ async def serial_gate_unfreeze(repo: str = ""): return {"ok": True, "repo": repo, "cleared": cleared, "frozen": frozen} +@app.post("/serial-gate/pause") +async def serial_gate_pause(work_item: str = ""): + """ORCH-124 (adr-0051 D7): park a task so the serial gate stops counting it as + an active FIFO blocker — an urgent successor may overtake it. + + Explicit, durable, DB-resolvable operator intent (NOT a Plane-status gesture): + stamps ``tasks.paused_at`` so the offline hot-claim SQL reads it locally without + a network call. Pause does NOT bypass a ``repo_freeze`` or a declared dependency + (different axes) and is NOT terminal (distinct from STOP/cancel). By образцу + ``POST /serial-gate/unfreeze``; never-raise. Pausing a terminal (done/cancelled) + task is a no-op. When the pause sub-flag is off the call is a no-op + warning + (the pause-term is omitted from the gate, so a column write would be latent). + """ + from . import db + from . import serial_gate + if not work_item or not work_item.strip(): + return {"ok": False, "error": "missing 'work_item'", "work_item": work_item} + work_item = work_item.strip() + if not serial_gate._pause_layer_enabled(): + return {"ok": False, "error": "serial_gate_pause_enabled is off (no-op)", + "work_item": work_item} + task = db.get_task_by_work_item_id(work_item) + if not task: + return {"ok": False, "error": "unknown work_item", "work_item": work_item} + task_id = task["id"] + stage = task.get("stage") + if stage in ("done", "cancelled"): + return {"ok": False, "error": f"task is terminal (stage={stage})", + "work_item": work_item, "task_id": task_id, "stage": stage} + ok = db.set_task_paused(task_id) + refreshed = db.get_task_by_work_item_id(work_item) or {} + paused_at = refreshed.get("paused_at") + if ok: + try: + from .notifications import send_telegram, link_for + send_telegram( + f"⏸️ {link_for(work_item)}: задача поставлена на ПАУЗУ для serial-gate " + f"(task {task_id}, stage={stage}). Срочный успешник репо может обогнать; " + f"resume — POST /serial-gate/resume." + ) + except Exception: + pass + return {"ok": ok, "work_item": work_item, "task_id": task_id, + "stage": stage, "paused_at": paused_at} + + +@app.post("/serial-gate/resume") +async def serial_gate_resume(work_item: str = ""): + """ORCH-124 (adr-0051 D7 / AC-10): resume a parked task — it re-enters the + serial gate (holds it as active again / re-enters FIFO with the deferred branch + cut, D8). Inverse of ``POST /serial-gate/pause``; idempotent (resuming a task + that is not paused clears nothing). Anti-stale-base on resume is guaranteed by + the EXISTING mechanisms (deferred branch cut + pre-merge auto_rebase_onto_main + + merge-gate re-test, ORCH-088/093/110) — no new rebase machinery. never-raise. + """ + from . import db + if not work_item or not work_item.strip(): + return {"ok": False, "error": "missing 'work_item'", "work_item": work_item} + work_item = work_item.strip() + task = db.get_task_by_work_item_id(work_item) + if not task: + return {"ok": False, "error": "unknown work_item", "work_item": work_item} + task_id = task["id"] + was_paused = task.get("paused_at") is not None + ok = db.clear_task_paused(task_id) + if ok and was_paused: + try: + from .notifications import send_telegram, link_for + send_telegram( + f"▶️ {link_for(work_item)}: задача СНЯТА С ПАУЗЫ (task {task_id}) — " + f"снова участвует в serial-gate." + ) + except Exception: + pass + return {"ok": ok, "work_item": work_item, "task_id": task_id, + "was_paused": was_paused, "paused_at": None} + + @app.post("/transition-lease/release") async def transition_lease_release(work_item: str = ""): """ORCH-114 (adr-0045 / D10): operator manual reclaim of a stuck transition-lease. diff --git a/src/serial_gate.py b/src/serial_gate.py index 0675e98..b5bc61d 100644 --- a/src/serial_gate.py +++ b/src/serial_gate.py @@ -23,6 +23,16 @@ Two deliberately different failure directions (ADR-001 D10, NFR-1): must not wedge the queue of ALL projects (AC-8). * freeze decision (``is_repo_frozen``) -> fail-CLOSED (``True``): when we cannot confirm the ABSENCE of a freeze we keep the gate closed for prod safety (AC-9). + +ORCH-124 (adr-0051): adds an ORTHOGONAL "pause" axis to the "active task" predicate +of all three points (``build_claim_clause`` / ``repo_has_active_task`` / +``_per_repo_snapshot``). A task with ``tasks.paused_at`` NOT NULL (an operator +``POST /serial-gate/pause``) is excluded from the FIFO "active" set so an URGENT +successor may overtake a paused predecessor — fixing incident ORCH-116/ORCH-123. The +terminal set ``{done,cancelled}`` (adr-0026) is UNCHANGED; ``task_deps`` / ``stages.py`` +do NOT read ``paused_at`` (pause never bypasses a freeze or a declared dependency). +Gated by the independent sub-flag ``serial_gate_pause_enabled`` (default True is a true +no-op until the first explicit pause). """ from __future__ import annotations @@ -97,6 +107,22 @@ def _freeze_layer_enabled() -> bool: return False +def _pause_layer_enabled() -> bool: + """ORCH-124 (adr-0051 D6): whether the per-task pause axis is active. + + Independent tumbler ``serial_gate_pause_enabled`` (mirror of + ``_freeze_layer_enabled``). When True the "active task" predicate of all three + serial-gate points additionally excludes paused tasks (``paused_at IS NULL``); + when False the pause-term is omitted and serial-gate behaves byte-for-byte as + ORCH-088/090. Default True is a true no-op until an operator parks a task + (``paused_at`` is NULL for every row). never-raise -> False (pause inert). + """ + try: + return bool(getattr(settings, "serial_gate_pause_enabled", False)) + except Exception: # noqa: BLE001 + return False + + # --------------------------------------------------------------------------- # Read helpers (active task + freeze) — only the local DB # --------------------------------------------------------------------------- @@ -113,16 +139,21 @@ def repo_has_active_task(repo: str, exclude_task_id: int | None = None) -> bool: # ORCH-090 (adr-0026): terminal set is {done,cancelled}. A cancelled # task must NOT count as "active" or it would block the repo's serial # gate forever. + # ORCH-124 (adr-0051 D4.2): under the pause layer a PARKED task + # (paused_at NOT NULL) is likewise NOT "active" — it must not hold the + # FIFO gate against an urgent successor. Same predicate as the hot SQL + # (D4.1) and the snapshot (D4.3) so the three points never drift (TR-7). + pause_term = " AND paused_at IS NULL" if _pause_layer_enabled() else "" if exclude_task_id is not None: row = conn.execute( "SELECT 1 FROM tasks WHERE repo=? AND id != ? " - "AND stage NOT IN ('done','cancelled') LIMIT 1", + f"AND stage NOT IN ('done','cancelled'){pause_term} LIMIT 1", (repo, exclude_task_id), ).fetchone() else: row = conn.execute( "SELECT 1 FROM tasks WHERE repo=? " - "AND stage NOT IN ('done','cancelled') LIMIT 1", + f"AND stage NOT IN ('done','cancelled'){pause_term} LIMIT 1", (repo,), ).fetchone() return row is not None @@ -271,10 +302,18 @@ def build_claim_clause() -> str: repo_scope = "" # ORCH-090 (adr-0026): {done,cancelled} are both terminal — an EARLIER # cancelled task no longer holds the FIFO serial gate closed. + # ORCH-124 (adr-0051 D4.1): under the pause layer an EARLIER PARKED task + # (paused_at NOT NULL) also no longer holds the FIFO gate — an urgent + # successor may overtake it. The pause-term is appended INSIDE the existing + # EXISTS subquery (no extra JOIN/EXISTS), reads only the local DB (offline + # hot path, NFR-2), and is built inside the same try/except so any error in + # the pause sub-expression still fails-OPEN (D9). pause off / kill-switch -> + # pause_term is "" -> the clause is byte-for-byte ORCH-088/090. + pause_term = " AND t2.paused_at IS NULL" if _pause_layer_enabled() else "" active_clause = ( "EXISTS (SELECT 1 FROM tasks t2 " "WHERE t2.repo = jobs.repo AND t2.id < jobs.task_id " - "AND t2.stage NOT IN ('done','cancelled')) " + f"AND t2.stage NOT IN ('done','cancelled'){pause_term}) " ) if _freeze_layer_enabled(): freeze_clause = ( @@ -329,23 +368,91 @@ def _known_repos() -> list[str]: return sorted(repos) +def _waiting_reason(conn, repo: str, task_id: int | None, *, + frozen: bool, pause_on: bool, deps_on: bool) -> str | None: + """ORCH-124 (adr-0051 D5): why an analyst-job is NOT claimable, or None. + + Priority order (matches the precedence of the actual claim gates): + ``freeze`` (active repo_freeze) -> ``dependency`` (an unfinished declared + job_deps predecessor, only when task_deps is on) -> ``active-task`` (an EARLIER + NON-paused unfinished task holds the FIFO gate) -> ``None`` (claimable). A + paused predecessor is deliberately NOT a reason — by design it does NOT block, + so it surfaces only via the snapshot's ``paused`` list, never here. never-raise + -> None on error (observability only, conservative). + """ + try: + if frozen: + return "freeze" + if deps_on and task_id is not None: + dep = conn.execute( + "SELECT 1 FROM job_deps d JOIN tasks t ON t.id = d.depends_on_task_id " + "WHERE d.task_id = ? AND t.stage NOT IN ('done','cancelled') LIMIT 1", + (task_id,), + ).fetchone() + if dep is not None: + return "dependency" + if task_id is not None: + pause_term = " AND paused_at IS NULL" if pause_on else "" + earlier = conn.execute( + "SELECT 1 FROM tasks WHERE repo=? AND id < ? " + f"AND stage NOT IN ('done','cancelled'){pause_term} LIMIT 1", + (repo, task_id), + ).fetchone() + if earlier is not None: + return "active-task" + return None + except Exception: # noqa: BLE001 - observability only + return None + + def _per_repo_snapshot(repo: str) -> dict: """Per-repo gate state for the /queue snapshot (never raises here).""" active_task = None waiting: list[dict] = [] + paused: list[dict] = [] + # ORCH-124 (adr-0051 D5): compute frozen up-front so the per-job reason can be + # derived in the same pass. is_repo_frozen uses its own connection (separate + # from the snapshot conn below). + frozen = is_repo_frozen(repo) + pause_on = _pause_layer_enabled() + try: + deps_on = bool(getattr(settings, "task_deps_enabled", False)) + except Exception: # noqa: BLE001 + deps_on = False try: conn = db.get_db() try: # ORCH-090 (adr-0026): terminal set {done,cancelled}. + # ORCH-124 (adr-0051 D4.3): a PARKED task is excluded from active_task + # (same predicate as build_claim_clause / repo_has_active_task — no + # drift, TR-7); it surfaces in the additive `paused` list instead. + pause_term = " AND paused_at IS NULL" if pause_on else "" row = conn.execute( "SELECT work_item_id, stage FROM tasks " - "WHERE repo=? AND stage NOT IN ('done','cancelled') ORDER BY id LIMIT 1", + f"WHERE repo=? AND stage NOT IN ('done','cancelled'){pause_term} " + "ORDER BY id LIMIT 1", (repo,), ).fetchone() if row: active_task = {"work_item_id": row["work_item_id"], "stage": row["stage"]} + # ORCH-124: additive `paused` list — non-terminal parked tasks of the + # repo (visible, but NOT counted as active_task). Only meaningful while + # the pause layer is on. + if pause_on: + for pr in conn.execute( + "SELECT work_item_id, stage, paused_at FROM tasks " + "WHERE repo=? AND stage NOT IN ('done','cancelled') " + "AND paused_at IS NOT NULL ORDER BY id", + (repo,), + ).fetchall(): + paused.append({ + "work_item_id": pr["work_item_id"], + "stage": pr["stage"], + "paused_at": pr["paused_at"], + }) for j in conn.execute( - "SELECT j.id AS job_id, t.work_item_id AS work_item_id, t.stage AS stage " + "SELECT j.id AS job_id, j.task_id AS task_id, " + "t.work_item_id AS work_item_id, t.stage AS stage " "FROM jobs j LEFT JOIN tasks t ON t.id = j.task_id " "WHERE j.repo=? AND j.status='queued' AND j.agent='analyst' " "ORDER BY j.id", @@ -355,12 +462,17 @@ def _per_repo_snapshot(repo: str) -> dict: "job_id": j["job_id"], "work_item_id": j["work_item_id"], "stage": j["stage"], + # ORCH-124 (D5): why this job is held (freeze/dependency/ + # active-task) or None when claimable. + "reason": _waiting_reason( + conn, repo, j["task_id"], + frozen=frozen, pause_on=pause_on, deps_on=deps_on, + ), }) finally: conn.close() except Exception as e: # noqa: BLE001 logger.warning("serial_gate per-repo snapshot error for %s: %s", repo, e) - frozen = is_repo_frozen(repo) frozen_reason = None frozen_at = None if frozen: @@ -374,6 +486,8 @@ def _per_repo_snapshot(repo: str) -> dict: return { "active_task": active_task, "waiting": waiting, + # ORCH-124 (D5): additive — parked predecessors (not shown as active_task). + "paused": paused, "frozen": frozen, "frozen_reason": frozen_reason, "frozen_at": frozen_at, diff --git a/tests/test_orch124_serial_gate_pause.py b/tests/test_orch124_serial_gate_pause.py new file mode 100644 index 0000000..31b092f --- /dev/null +++ b/tests/test_orch124_serial_gate_pause.py @@ -0,0 +1,353 @@ +"""ORCH-124 — serial-gate wait/pause semantics (real tmp SQLite, no network). + +A paused predecessor must NOT block an urgent successor's analyst-job, while a +normally-executing predecessor still holds the FIFO gate (anti-stale-base ORCH-088 +preserved). Covers 04-test-plan.yaml TC-01…TC-15. The behaviour (not the exact SQL) +is asserted: pause is an explicit, durable, DB-resolvable per-task signal +(``tasks.paused_at``) that the offline hot-claim SQL reads locally. + + TC-01 REGRESS (mandatory): earlier PAUSED task A + later urgent B -> claim picks + B's analyst-job (gate open). Reproduces incident ORCH-116/ORCH-123. + TC-02 Predecessor parked (Backlog intent) -> build_claim_clause does NOT block B. + TC-03 Predecessor parked at another wait-stage (Needs-Input intent) -> still open. + TC-04 ANTI-REGRESS ORCH-088: a NON-paused unfinished predecessor STILL blocks B. + TC-05 Pause needs explicit durable intent; unpaused non-terminal task stays active. + TC-06 Durable: the pause signal survives a connection/restart (read from the DB). + TC-07 Resume restores participation in the gate (no eternal bypass). + TC-08 Explicit blocks kept: an active repo_freeze still gates B (pause != bypass). + TC-09 Explicit blocks kept: an unfinished declared dependency still gates B. + TC-10 /queue snapshot: paused task not shown as active_task; reason is correct. + TC-11 Three points agree on "active" (anti-drift): clause / mirror / snapshot. + TC-12 Hot-path offline: claim resolves pause with no network (Plane not consulted). + TC-13 never-raise / fail-directions: pause error -> build_claim_clause fail-OPEN. + TC-14 Kill-switch: pause sub-flag off -> byte-for-byte ORCH-088/090 (paused blocks). + TC-15 Structural anti-regress: STAGE_TRANSITIONS / QG_CHECKS / table schemas intact. +""" +import os +import tempfile + +import pytest + +os.environ["ORCH_DB_PATH"] = os.path.join(tempfile.gettempdir(), "test_orch124_pause.db") +os.environ.setdefault("ORCH_GITEA_TOKEN", "test-token") +os.environ.setdefault("ORCH_PLANE_API_TOKEN", "test-token") + +import src.db as db # noqa: E402 +from src.db import init_db, get_db, enqueue_job, claim_next_job # noqa: E402 +from src import serial_gate # noqa: E402 +from src import config as cfg # noqa: E402 + + +@pytest.fixture(autouse=True) +def fresh_db(tmp_path, monkeypatch): + dbfile = tmp_path / "pause.db" + monkeypatch.setattr(db.settings, "db_path", str(dbfile)) + # Serial gate ON; freeze layer ON; pause layer ON; empty CSV (all repos). + monkeypatch.setattr(cfg.settings, "serial_gate_enabled", True, raising=False) + monkeypatch.setattr(cfg.settings, "serial_gate_repos", "", raising=False) + monkeypatch.setattr(cfg.settings, "serial_gate_freeze_enabled", True, raising=False) + monkeypatch.setattr(cfg.settings, "serial_gate_pause_enabled", True, raising=False) + # Keep the unrelated dep-gate inert unless a test opts in. + monkeypatch.setattr(cfg.settings, "task_deps_enabled", False, raising=False) + init_db() + yield + + +def _make_task(work_item_id, stage="analysis", repo="orchestrator"): + conn = get_db() + cur = conn.execute( + "INSERT INTO tasks (plane_id, work_item_id, repo, branch, stage, title) " + "VALUES (?, ?, ?, ?, ?, ?)", + (work_item_id, work_item_id, repo, f"feature/{work_item_id}", stage, work_item_id), + ) + tid = cur.lastrowid + conn.commit() + conn.close() + return tid + + +# --------------------------------------------------------------- TC-01 +def test_paused_predecessor_does_not_block_urgent_successor(): + """REGRESS (ORCH-116/ORCH-123): earlier PAUSED A must not gate urgent B.""" + a = _make_task("ORCH-116", stage="development") # earlier predecessor + b = _make_task("ORCH-123", stage="analysis") # later urgent task + job_b = enqueue_job("analyst", "orchestrator", "B", task_id=b) + + # Before the pause A holds the FIFO gate -> B is blocked (the incident state). + assert claim_next_job() is None, "active A gates B (pre-pause, FIFO ORCH-088)" + + # Operator parks A. Now B's analyst-job must become claimable. + assert db.set_task_paused(a) is True + claimed = claim_next_job() + assert claimed is not None and claimed["id"] == job_b, ( + "a PAUSED predecessor must not gate the urgent successor (AC-1)" + ) + + +# --------------------------------------------------------------- TC-02 +def test_parked_backlog_predecessor_not_active_in_clause(): + a = _make_task("ORCH-201", stage="analysis") # "Backlog" intent + b = _make_task("ORCH-202", stage="analysis") + job_b = enqueue_job("analyst", "orchestrator", "B", task_id=b) + db.set_task_paused(a) + assert "paused_at IS NULL" in serial_gate.build_claim_clause() + claimed = claim_next_job() + assert claimed is not None and claimed["id"] == job_b + + +# --------------------------------------------------------------- TC-03 +def test_parked_needs_input_predecessor_not_active(): + # Another wait-stage (review ~ "Needs-Input" intent) — same park column. + a = _make_task("ORCH-203", stage="review") + b = _make_task("ORCH-204", stage="analysis") + job_b = enqueue_job("analyst", "orchestrator", "B", task_id=b) + db.set_task_paused(a) + claimed = claim_next_job() + assert claimed is not None and claimed["id"] == job_b + + +# --------------------------------------------------------------- TC-04 +def test_non_paused_predecessor_still_blocks_fifo(): + """ANTI-REGRESS ORCH-088: a normally-executing A still gates B.""" + _make_task("ORCH-210", stage="development") # NOT paused + b = _make_task("ORCH-211", stage="analysis") + enqueue_job("analyst", "orchestrator", "B", task_id=b) + assert claim_next_job() is None, ( + "a non-paused unfinished predecessor must STILL hold the gate (FIFO intact)" + ) + + +# --------------------------------------------------------------- TC-05 +def test_pause_requires_explicit_durable_intent(): + a = _make_task("ORCH-215", stage="development") + b = _make_task("ORCH-216", stage="analysis") + enqueue_job("analyst", "orchestrator", "B", task_id=b) + # No explicit pause -> A is active -> gate held (no heuristic auto-pause). + assert db.is_task_paused(a) is False + assert claim_next_job() is None + # The pause signal is DB-resolvable once set explicitly. + db.set_task_paused(a) + assert db.is_task_paused(a) is True + + +# --------------------------------------------------------------- TC-06 +def test_pause_signal_is_durable_across_restart(): + a = _make_task("ORCH-220", stage="development") + b = _make_task("ORCH-221", stage="analysis") + job_b = enqueue_job("analyst", "orchestrator", "B", task_id=b) + db.set_task_paused(a) + # Simulate a restart: drop in-memory state, re-run the idempotent migration. + init_db() + assert db.is_task_paused(a) is True, "pause must survive restart (read from DB)" + claimed = claim_next_job() + assert claimed is not None and claimed["id"] == job_b + + +# --------------------------------------------------------------- TC-07 +def test_resume_restores_gate_participation(): + a = _make_task("ORCH-225", stage="development") + b = _make_task("ORCH-226", stage="analysis") + enqueue_job("analyst", "orchestrator", "B", task_id=b) + db.set_task_paused(a) + assert claim_next_job() is not None # B claimable while A paused + # Re-queue a fresh analyst-job for B (the previous one was claimed) and resume A. + conn = get_db() + conn.execute("UPDATE jobs SET status='queued', started_at=NULL WHERE task_id=?", (b,)) + conn.commit() + conn.close() + assert db.clear_task_paused(a) is True + assert db.is_task_paused(a) is False + assert claim_next_job() is None, ( + "after resume A holds the gate again — no eternal bypass (AC-10)" + ) + + +# --------------------------------------------------------------- TC-08 +def test_pause_does_not_bypass_freeze(): + _make_task("ORCH-230", stage="done") # nothing unfinished + a = _make_task("ORCH-231", stage="development") + b = _make_task("ORCH-232", stage="analysis") + enqueue_job("analyst", "orchestrator", "B", task_id=b) + db.set_task_paused(a) + # Freeze the repo: even with A paused, B must stay blocked by the freeze. + serial_gate.set_repo_freeze("orchestrator", "DEGRADED", "ORCH-230") + assert claim_next_job() is None, "an active freeze gates B; pause must not bypass it" + # Clearing the freeze (A still paused) -> B becomes claimable. + serial_gate.clear_repo_freeze("orchestrator") + assert claim_next_job() is not None + + +# --------------------------------------------------------------- TC-09 +def test_pause_does_not_bypass_declared_dependency(monkeypatch): + monkeypatch.setattr(cfg.settings, "task_deps_enabled", True, raising=False) + a = _make_task("ORCH-240", stage="development") + b = _make_task("ORCH-241", stage="analysis") + enqueue_job("analyst", "orchestrator", "B", task_id=b) + assert db.add_dependency(b, a) is True # B blocked-by A + db.set_task_paused(a) + # task_deps reads the {done,cancelled} terminal only (NOT paused_at): an + # unfinished declared dependency keeps B blocked even though A is paused. + assert claim_next_job() is None, ( + "a declared unfinished dependency gates B; pause must not bypass it (AC-5)" + ) + # Once A is terminal the dependency is satisfied -> B is claimable. + conn = get_db() + conn.execute("UPDATE tasks SET stage='done' WHERE id=?", (a,)) + conn.commit() + conn.close() + assert claim_next_job() is not None + + +# --------------------------------------------------------------- TC-10 +def test_snapshot_reason_and_paused_list(): + a = _make_task("ORCH-250", stage="development") + b = _make_task("ORCH-251", stage="analysis") + job_b = enqueue_job("analyst", "orchestrator", "B", task_id=b) + + # (a) A active (not paused) -> B waits with reason 'active-task'; A is active_task. + per = serial_gate.snapshot()["per_repo"]["orchestrator"] + assert per["active_task"]["work_item_id"] == "ORCH-250" + assert per["paused"] == [] + wb = next(w for w in per["waiting"] if w["job_id"] == job_b) + assert wb["reason"] == "active-task" + # Existing keys preserved (BC). + assert set(per) >= {"active_task", "waiting", "frozen", "frozen_reason", "frozen_at"} + + # (b) Pause A -> A no longer active_task; it appears in `paused`; B is claimable + # (reason None — a paused predecessor is by design NOT a wait reason). + db.set_task_paused(a) + per = serial_gate.snapshot()["per_repo"]["orchestrator"] + assert per["active_task"] is None or per["active_task"]["work_item_id"] != "ORCH-250" + assert any(p["work_item_id"] == "ORCH-250" for p in per["paused"]) + wb = next(w for w in per["waiting"] if w["job_id"] == job_b) + assert wb["reason"] is None + + # (c) Freeze -> reason 'freeze' (highest priority). + serial_gate.set_repo_freeze("orchestrator", "DEGRADED", "ORCH-250") + per = serial_gate.snapshot()["per_repo"]["orchestrator"] + wb = next(w for w in per["waiting"] if w["job_id"] == job_b) + assert wb["reason"] == "freeze" + + +# --------------------------------------------------------------- TC-11 +def test_three_points_agree_on_active(): + """Anti-drift: clause / mirror / snapshot classify predecessor A identically. + + B is excluded from the mirror (``exclude_task_id=b``) to mirror the clause's + own-row exclusion (``t2.id < jobs.task_id``), so the three points are asked the + SAME question: "does the non-B predecessor A count as an active blocker?". + """ + a = _make_task("ORCH-260", stage="development") + b = _make_task("ORCH-261", stage="analysis") + enqueue_job("analyst", "orchestrator", "B", task_id=b) + + # A NOT paused -> all three say A is active. + assert serial_gate.repo_has_active_task("orchestrator", exclude_task_id=b) is True + assert (serial_gate.snapshot()["per_repo"]["orchestrator"]["active_task"] + ["work_item_id"] == "ORCH-260") + assert claim_next_job() is None # clause blocks B on A + + # A paused -> all three agree A is NOT active (consistent, no drift). + db.set_task_paused(a) + assert serial_gate.repo_has_active_task("orchestrator", exclude_task_id=b) is False + snap = serial_gate.snapshot()["per_repo"]["orchestrator"] + active = snap["active_task"] + assert active is None or active["work_item_id"] != "ORCH-260" + assert any(p["work_item_id"] == "ORCH-260" for p in snap["paused"]) + assert claim_next_job() is not None # clause now opens for B + + +# --------------------------------------------------------------- TC-12 +def test_hot_path_is_offline(): + """The pause predicate resolves from the local DB only — no network.""" + a = _make_task("ORCH-270", stage="development") + b = _make_task("ORCH-271", stage="analysis") + job_b = enqueue_job("analyst", "orchestrator", "B", task_id=b) + db.set_task_paused(a) + # Functional: claim works with no Plane configured/reachable. + claimed = claim_next_job() + assert claimed is not None and claimed["id"] == job_b + # Structural: the gate leaf imports no network client (offline hot path, NFR-2). + import inspect + src = inspect.getsource(serial_gate) + for forbidden in ("import httpx", "import requests", "plane_sync", "urllib.request"): + assert forbidden not in src, f"serial_gate must stay offline (found {forbidden!r})" + + +# --------------------------------------------------------------- TC-13 +def test_pause_error_fails_open_and_never_raises(monkeypatch): + _make_task("ORCH-280", stage="development") # would close the gate + b = _make_task("ORCH-281", stage="analysis") + job_b = enqueue_job("analyst", "orchestrator", "B", task_id=b) + + def _boom(): + raise RuntimeError("pause layer probe down") + + monkeypatch.setattr(serial_gate, "_pause_layer_enabled", _boom, raising=True) + # build_claim_clause must fail-OPEN ('' fragment) — never raise, never wedge. + assert serial_gate.build_claim_clause() == "" + claimed = claim_next_job() + assert claimed is not None and claimed["id"] == job_b, ( + "a pause-layer error must fail-OPEN, not wedge the queue (AC-9)" + ) + # The other public functions degrade conservatively without raising. + assert serial_gate.repo_has_active_task("orchestrator") in (True, False) + assert isinstance(serial_gate.snapshot(), dict) + # Freeze direction is NOT inverted by a pause error (still fail-CLOSED on doubt). + monkeypatch.setattr( + serial_gate, "_active_freeze_row", + lambda repo: (_ for _ in ()).throw(RuntimeError("freeze read down")), + raising=True, + ) + assert serial_gate.is_repo_frozen("orchestrator") is True + # The DB mutators/readers never raise on bad input either. + assert db.set_task_paused(None) is False + assert db.clear_task_paused(None) is False + assert db.is_task_paused(None) is False + + +# --------------------------------------------------------------- TC-14 +def test_kill_switch_off_is_byte_for_byte_orch088(monkeypatch): + monkeypatch.setattr(cfg.settings, "serial_gate_pause_enabled", False, raising=False) + a = _make_task("ORCH-290", stage="development") + b = _make_task("ORCH-291", stage="analysis") + enqueue_job("analyst", "orchestrator", "B", task_id=b) + db.set_task_paused(a) + # Pause sub-flag OFF -> the pause-term is omitted -> a paused task STILL counts + # as active (deliberate ORCH-088/090 rollback behaviour). + assert "paused_at" not in serial_gate.build_claim_clause() + assert claim_next_job() is None, ( + "with the pause sub-flag off serial-gate is byte-for-byte ORCH-088/090" + ) + # Outside the (empty) repo scope nothing changes for enduro either. + et = _make_task("ET-290", stage="analysis", repo="enduro-trails") + job_et = enqueue_job("analyst", "enduro-trails", "B", task_id=et) + claimed = claim_next_job() + assert claimed is not None and claimed["id"] == job_et + + +# --------------------------------------------------------------- TC-15 +def test_registries_and_schemas_unchanged(): + from src.stages import STAGE_TRANSITIONS + from src.qg.checks import QG_CHECKS + # ORCH-124 is a scheduler-only change: no new edge, no new terminal sink. + assert set(STAGE_TRANSITIONS) == { + "created", "analysis", "architecture", "development", "review", + "testing", "deploy-staging", "deploy", "done", "cancelled", + } + # No serial-gate / pause QG check was introduced (the gate is a scheduler cond). + assert not any("serial" in k or "pause" in k for k in QG_CHECKS) + # Existing table schemas intact; tasks gained the additive paused_at column. + conn = get_db() + try: + task_cols = {r[1] for r in conn.execute("PRAGMA table_info(tasks)").fetchall()} + job_cols = {r[1] for r in conn.execute("PRAGMA table_info(jobs)").fetchall()} + dep_cols = {r[1] for r in conn.execute("PRAGMA table_info(job_deps)").fetchall()} + frz_cols = {r[1] for r in conn.execute("PRAGMA table_info(repo_freeze)").fetchall()} + finally: + conn.close() + assert "paused_at" in task_cols # additive + assert {"id", "repo", "stage", "work_item_id"}.issubset(task_cols) + assert {"id", "agent", "repo", "status", "task_id"}.issubset(job_cols) + assert {"task_id", "depends_on_task_id"}.issubset(dep_cols) + assert {"repo", "frozen_at", "cleared_at"}.issubset(frz_cols)