"""ORCH-090 (ADR-001 D9 / adr-0026): STOP-cancellation leaf — pure decision logic. Leaf module mirroring ``src/serial_gate.py`` / ``src/labels.py``: pure, unit-testable, never-raise functions over config + the existing DB / deploy-state. Module-level imports are limited to ``config`` (and ``re``); the critical-window probe lazily imports ``self_deploy`` / ``merge_gate`` / ``db`` so a cycle can never form and an import failure degrades safely. What it answers: * ``applies(repo)`` — is STOP-cancellation REAL for this repo? * ``in_critical_window(task)``— is the task inside an irreversible merge/deploy step where cancellation must be DEFERRED (ADR-001 D7) instead of applied now? * ``snapshot()`` — read-only summary for ``GET /queue`` (AC-10). The ORCHESTRATION of a cancellation (SIGTERM, cancel-jobs, worktree/branch cleanup, key tombstone, notifications) lives in ``stage_engine.cancel_task`` — this leaf only decides, it never mutates. never-raise contract (self-hosting safety): every public function degrades conservatively. ``applies`` -> False on error (gate inert, the kill-switch-off default). ``in_critical_window`` -> True on doubt (fail-CLOSED: when we cannot confirm we are OUTSIDE a critical window, DEFER cancellation rather than risk tearing a half-merge / detached prod deploy, NFR-3 / TR-3). """ from __future__ import annotations import logging import re from .config import settings logger = logging.getLogger("orchestrator.cancel") # Repo tokens in the CSV scope must match this (mirrors serial_gate._REPO_TOKEN). _REPO_TOKEN = re.compile(r"^[A-Za-z0-9._-]+$") def _scope_repos() -> set[str]: """Sanitised set of in-scope repo tokens from ``stop_status_repos`` (CSV). Empty/blank CSV -> empty set, meaning "apply to ALL repos" (D9). Invalid tokens (regex miss) are dropped. Never raises. """ try: raw = (settings.stop_status_repos or "").strip() except Exception: # noqa: BLE001 return set() if not raw: return set() out: set[str] = set() for tok in raw.split(","): t = tok.strip() if t and _REPO_TOKEN.match(t): out.add(t) elif t: logger.warning("cancel: dropping invalid repo token %r from CSV", t) return out def applies(repo: str) -> bool: """Whether STOP-cancellation is REAL for this repo (D9 / AC-8). * ``stop_status_enabled=False`` -> always False (kill-switch; STOP handling and the relaunch-hole gate are 1:1 as before ORCH-090). * ``stop_status_repos`` (CSV) non-empty -> real only for listed repos. * empty CSV -> real for ALL repos (cancellation is meaningful for enduro too). Never raises -> False on error (degrade to "inert", matching kill-switch off). """ try: if not getattr(settings, "stop_status_enabled", False): return False scope = _scope_repos() if scope: return (repo or "").strip() in scope return True except Exception as e: # noqa: BLE001 - never-raise logger.warning("cancel.applies error for %s: %s", repo, e) return False def _task_has_running_actor(task_id) -> bool: """True iff the task currently has a RUNNING job — an active merge/deploy actor. Distinguishes a genuinely in-flight merge/deploy (a running deployer / deploy finalizer job actually executing the irreversible step) from a task merely PARKED on ``deploy`` awaiting the human ``Confirm Deploy`` (the merge-lease is held across that wait, ORCH-036/043, but nothing is executing and nothing has been merged/deployed). Lazily imports ``db``; raises on a db error so the caller fails CLOSED (treat as critical) rather than silently mis-classifying on doubt. """ if not task_id: return False from . import db for job in db.get_active_jobs_for_task(task_id): if job.get("status") == "running": return True return False def in_critical_window(task: dict) -> bool: """Is the task inside an irreversible merge/deploy step (ADR-001 D7 / AC-7)? A STOP that lands here must NOT tear the step apart (half-merge / detached prod deploy / dead prod container, NFR-3). Markers (existing, no new state): * self-deploy Phase B initiated — the ``INITIATED`` sentinel in ``/.deploy-state-//`` (ORCH-036) — the detached prod deploy + the deterministic ``merge_pr`` (``_handle_merge_verify``, run later under the SAME marker) are both covered here; * the task HOLDS the per-repo merge-lease ``/.merge-lease-.json`` (ORCH-043), holder branch == task branch, **AND** a merge/deploy actor is actually RUNNING. The merge-lease branch is gated on a running actor on purpose (ORCH-090 review P1 fix). For the self-hosting repo the lease is HELD from the merge-gate PASS (``deploy-staging -> deploy`` edge) right through to ``deploy -> done`` — including the whole time the task sits PARKED on ``deploy`` awaiting a human ``Confirm Deploy`` (Phase A). That wait is FULLY REVERSIBLE: nothing is merged or deployed (the irreversible ``merge_pr`` only runs later in ``_handle_merge_verify``, always under an ``INITIATED`` marker already caught above). Classifying that idle parking as "critical" used to DEFER the cancel to a deploy finalizer that the operator — having pressed STOP precisely to NOT confirm — never triggers, so the cancel was never applied and the task wedged while still holding the lease (blocking the repo's serial-gate / merges). Now idle parking (lease held, no running actor) is NOT critical: the full reset runs immediately and itself releases the lease. fail-CLOSED (TR-3): any error/uncertainty -> True (DEFER cancellation). Outside the window -> False (apply the full reset immediately). """ if not task: return False repo = task.get("repo") work_item_id = task.get("work_item_id") branch = task.get("branch") try: from . import self_deploy if self_deploy.has_marker(repo, work_item_id, self_deploy.INITIATED): return True except Exception as e: # noqa: BLE001 - fail-CLOSED on doubt logger.warning("cancel.in_critical_window self_deploy probe error: %s", e) return True try: from . import merge_gate holder = merge_gate.current_lease_holder(repo) if holder and branch and holder == branch: # Lease held. Critical ONLY if an actor is actively merging/deploying; # an idle task parked on `deploy` awaiting Confirm Deploy is reversible. if _task_has_running_actor(task.get("id")): return True logger.info( "cancel.in_critical_window: task %s holds the merge-lease but no " "actor is running (idle deploy parking, awaiting Confirm Deploy) -> " "NOT critical; full reset will release the lease", task.get("id"), ) return False except Exception as e: # noqa: BLE001 - fail-CLOSED on doubt logger.warning("cancel.in_critical_window merge-lease probe error: %s", e) return True return False def snapshot() -> dict: """Read-only STOP-cancellation summary for GET /queue (AC-10). Additive block; existing /queue keys are untouched. never-raise -> a minimal dict with the flags on error. """ try: enabled = bool(getattr(settings, "stop_status_enabled", False)) except Exception: # noqa: BLE001 enabled = False try: repos_cfg = getattr(settings, "stop_status_repos", "") or "" except Exception: # noqa: BLE001 repos_cfg = "" try: from . import db stats = db.cancelled_tasks_snapshot(10) except Exception as e: # noqa: BLE001 - never-raise logger.warning("cancel.snapshot error: %s", e) stats = {"count": 0, "pending": 0, "recent": []} return { "enabled": enabled, "repos": repos_cfg, "cancelled_count": stats.get("count", 0), "deferred_pending": stats.get("pending", 0), "recent": stats.get("recent", []), }