feat(cancel): STOP-status task cancellation + relaunch-hole close (ORCH-090)
Introduce the dedicated Plane STOP status as a single declarative task-cancel
mechanism: stop the active agent (graceful SIGTERM cascade), cancel all jobs
(terminal `cancelled`, never requeued), remove the worktree + delete the remote
feature branch (never main, never force-push), drive the task to the new
system-terminal state `cancelled` and tombstone the natural keys so a later
"To Analyse" re-creates it from scratch (docs artefacts preserved). STOP during a
critical merge/deploy window is deferred until the irreversible step finishes
honestly. Also closes the relaunch hole: handle_status_start relaunch is gated to
the `analysis` stage; the only pipeline-start entry point remains "To Analyse".
Cross-cutting (adr-0026): the "task terminal" predicate is widened {done} ->
{done, cancelled} in serial_gate / task_deps / stages sink + reaper/worker
requeue guards. STAGE_TRANSITIONS exit-gates / QG_CHECKS / check_* are unchanged
(`cancelled` is a sink, not a new edge). Additive, never-raise, restart-safe,
under kill-switch ORCH_STOP_STATUS_ENABLED (off -> zero regression).
New: src/cancel.py (leaf), src/gitea.py (delete_remote_branch), tasks columns
cancelled_at/cancel_requested_at, jobs status `cancelled`, GET /queue `stop` block.
Tests: tests/test_stop_status.py (TC-01..TC-14 + D7); full suite green (1345).
Docs updated in-PR (architecture README, CLAUDE.md, README.md, .env.example,
CHANGELOG). ADR-001 D4 refinement: plane_issue_id is tombstoned too (the lookup
ORs on it) — original UUID recoverable from the parseable suffix.
Refs: ORCH-090
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
@@ -679,17 +679,47 @@ class AgentLauncher:
|
||||
if timeout is None:
|
||||
timeout = self._resolve_timeout(agent)
|
||||
time.sleep(timeout)
|
||||
# ORCH-090: the SIGTERM->grace->SIGKILL cascade is now a reusable helper
|
||||
# (stop_process) shared with the STOP-cancellation path. The timeout
|
||||
# watchdog just sleeps the timeout, then drives the cascade.
|
||||
logger.warning(
|
||||
f"Agent run_id={run_id} exceeded {timeout}s timeout (pid={pid})"
|
||||
)
|
||||
self.stop_process(pid, run_id, reason=f"timeout>{timeout}s")
|
||||
|
||||
def stop_process(self, pid: int, run_id: int | None, *, reason: str = "stop") -> bool:
|
||||
"""ORCH-7 / ORCH-090 (ADR-001 D2): graceful SIGTERM->grace->SIGKILL cascade.
|
||||
|
||||
Extracted from ``_watchdog`` so the STOP-cancellation path
|
||||
(``stage_engine.cancel_task``) stops an active agent through the SAME
|
||||
graceful cascade instead of a new "dirty" kill (AC-1). Send SIGTERM, give
|
||||
the process up to ``settings.agent_kill_grace_seconds`` to flush and exit,
|
||||
SIGKILL only if it is still alive after the grace; stamp ``agent_runs``
|
||||
exit_code=-9 via ``_record_kill`` whenever a kill actually happened.
|
||||
|
||||
never-raise; ``ProcessLookupError`` is tolerated at every step (the process
|
||||
may already be gone). Returns True iff a SIGTERM was delivered to a live
|
||||
process; False when the process was already gone (no record — the monitor's
|
||||
``proc.wait()`` owns that exit).
|
||||
"""
|
||||
if pid is None:
|
||||
return False
|
||||
# Phase 1: SIGTERM (graceful). If the process is already gone, we're done.
|
||||
try:
|
||||
os.kill(pid, signal.SIGTERM)
|
||||
logger.warning(
|
||||
f"Agent run_id={run_id} exceeded {timeout}s timeout: sent SIGTERM "
|
||||
f"(pid={pid}), grace={settings.agent_kill_grace_seconds}s"
|
||||
f"stop_process ({reason}): sent SIGTERM to pid={pid} "
|
||||
f"(run_id={run_id}), grace={settings.agent_kill_grace_seconds}s"
|
||||
)
|
||||
except ProcessLookupError:
|
||||
logger.info(f"Agent run_id={run_id} already exited before SIGTERM")
|
||||
return # nothing to record: the monitor's proc.wait() owns the exit
|
||||
logger.info(
|
||||
f"stop_process ({reason}): pid={pid} already exited "
|
||||
f"(run_id={run_id}); nothing to record"
|
||||
)
|
||||
return False
|
||||
except Exception as e: # noqa: BLE001 - never-raise
|
||||
logger.warning(f"stop_process SIGTERM error pid={pid}: {e}")
|
||||
return False
|
||||
|
||||
# Phase 2: poll for graceful exit within the grace window.
|
||||
grace = settings.agent_kill_grace_seconds
|
||||
@@ -702,21 +732,27 @@ class AgentLauncher:
|
||||
os.kill(pid, 0) # signal 0 = liveness probe, does not kill
|
||||
except ProcessLookupError:
|
||||
logger.info(
|
||||
f"Agent run_id={run_id} exited gracefully after SIGTERM "
|
||||
f"({waited:.1f}s); no SIGKILL needed"
|
||||
f"stop_process ({reason}): pid={pid} exited gracefully after "
|
||||
f"SIGTERM ({waited:.1f}s); no SIGKILL needed"
|
||||
)
|
||||
self._record_kill(run_id)
|
||||
return
|
||||
return True
|
||||
except Exception: # noqa: BLE001 - probe error -> escalate to SIGKILL
|
||||
break
|
||||
|
||||
# Phase 3: still alive -> hard SIGKILL.
|
||||
try:
|
||||
os.kill(pid, signal.SIGKILL)
|
||||
logger.warning(
|
||||
f"Agent run_id={run_id} did not exit within {grace}s grace: sent SIGKILL"
|
||||
f"stop_process ({reason}): pid={pid} did not exit within {grace}s "
|
||||
f"grace: sent SIGKILL"
|
||||
)
|
||||
except ProcessLookupError:
|
||||
logger.info(f"Agent run_id={run_id} exited just before SIGKILL")
|
||||
logger.info(f"stop_process ({reason}): pid={pid} exited just before SIGKILL")
|
||||
except Exception as e: # noqa: BLE001 - never-raise
|
||||
logger.warning(f"stop_process SIGKILL error pid={pid}: {e}")
|
||||
self._record_kill(run_id)
|
||||
return True
|
||||
|
||||
@staticmethod
|
||||
def _record_kill(run_id: int):
|
||||
|
||||
144
src/cancel.py
Normal file
144
src/cancel.py
Normal file
@@ -0,0 +1,144 @@
|
||||
"""ORCH-090 (ADR-001 D9 / adr-0026): STOP-cancellation leaf — pure decision logic.
|
||||
|
||||
Leaf module mirroring ``src/serial_gate.py`` / ``src/labels.py``: pure,
|
||||
unit-testable, never-raise functions over config + the existing DB / deploy-state.
|
||||
Module-level imports are limited to ``config`` (and ``re``); the critical-window
|
||||
probe lazily imports ``self_deploy`` / ``merge_gate`` so a cycle can never form and
|
||||
an import failure degrades safely.
|
||||
|
||||
What it answers:
|
||||
* ``applies(repo)`` — is STOP-cancellation REAL for this repo?
|
||||
* ``in_critical_window(task)``— is the task inside an irreversible merge/deploy
|
||||
step where cancellation must be DEFERRED (ADR-001 D7) instead of applied now?
|
||||
* ``snapshot()`` — read-only summary for ``GET /queue`` (AC-10).
|
||||
|
||||
The ORCHESTRATION of a cancellation (SIGTERM, cancel-jobs, worktree/branch
|
||||
cleanup, key tombstone, notifications) lives in ``stage_engine.cancel_task`` — this
|
||||
leaf only decides, it never mutates.
|
||||
|
||||
never-raise contract (self-hosting safety): every public function degrades
|
||||
conservatively. ``applies`` -> False on error (gate inert, the kill-switch-off
|
||||
default). ``in_critical_window`` -> True on doubt (fail-CLOSED: when we cannot
|
||||
confirm we are OUTSIDE a critical window, DEFER cancellation rather than risk
|
||||
tearing a half-merge / detached prod deploy, NFR-3 / TR-3).
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
import re
|
||||
|
||||
from .config import settings
|
||||
|
||||
logger = logging.getLogger("orchestrator.cancel")
|
||||
|
||||
# Repo tokens in the CSV scope must match this (mirrors serial_gate._REPO_TOKEN).
|
||||
_REPO_TOKEN = re.compile(r"^[A-Za-z0-9._-]+$")
|
||||
|
||||
|
||||
def _scope_repos() -> set[str]:
|
||||
"""Sanitised set of in-scope repo tokens from ``stop_status_repos`` (CSV).
|
||||
|
||||
Empty/blank CSV -> empty set, meaning "apply to ALL repos" (D9). Invalid tokens
|
||||
(regex miss) are dropped. Never raises.
|
||||
"""
|
||||
try:
|
||||
raw = (settings.stop_status_repos or "").strip()
|
||||
except Exception: # noqa: BLE001
|
||||
return set()
|
||||
if not raw:
|
||||
return set()
|
||||
out: set[str] = set()
|
||||
for tok in raw.split(","):
|
||||
t = tok.strip()
|
||||
if t and _REPO_TOKEN.match(t):
|
||||
out.add(t)
|
||||
elif t:
|
||||
logger.warning("cancel: dropping invalid repo token %r from CSV", t)
|
||||
return out
|
||||
|
||||
|
||||
def applies(repo: str) -> bool:
|
||||
"""Whether STOP-cancellation is REAL for this repo (D9 / AC-8).
|
||||
|
||||
* ``stop_status_enabled=False`` -> always False (kill-switch; STOP handling and
|
||||
the relaunch-hole gate are 1:1 as before ORCH-090).
|
||||
* ``stop_status_repos`` (CSV) non-empty -> real only for listed repos.
|
||||
* empty CSV -> real for ALL repos (cancellation is meaningful for enduro too).
|
||||
Never raises -> False on error (degrade to "inert", matching kill-switch off).
|
||||
"""
|
||||
try:
|
||||
if not getattr(settings, "stop_status_enabled", False):
|
||||
return False
|
||||
scope = _scope_repos()
|
||||
if scope:
|
||||
return (repo or "").strip() in scope
|
||||
return True
|
||||
except Exception as e: # noqa: BLE001 - never-raise
|
||||
logger.warning("cancel.applies error for %s: %s", repo, e)
|
||||
return False
|
||||
|
||||
|
||||
def in_critical_window(task: dict) -> bool:
|
||||
"""Is the task inside an irreversible merge/deploy step (ADR-001 D7 / AC-7)?
|
||||
|
||||
A STOP that lands here must NOT tear the step apart (half-merge / detached prod
|
||||
deploy / dead prod container, NFR-3). Two markers (existing, no new state):
|
||||
* self-deploy Phase B initiated — the ``INITIATED`` sentinel in
|
||||
``<repos_dir>/.deploy-state-<repo>/<wi>/`` (ORCH-036);
|
||||
* the task currently HOLDS the per-repo merge-lease
|
||||
``<repos_dir>/.merge-lease-<repo>.json`` (ORCH-043), holder branch == task
|
||||
branch.
|
||||
|
||||
fail-CLOSED (TR-3): any error/uncertainty -> True (DEFER cancellation). Outside
|
||||
the window -> False (apply the full reset immediately).
|
||||
"""
|
||||
if not task:
|
||||
return False
|
||||
repo = task.get("repo")
|
||||
work_item_id = task.get("work_item_id")
|
||||
branch = task.get("branch")
|
||||
try:
|
||||
from . import self_deploy
|
||||
if self_deploy.has_marker(repo, work_item_id, self_deploy.INITIATED):
|
||||
return True
|
||||
except Exception as e: # noqa: BLE001 - fail-CLOSED on doubt
|
||||
logger.warning("cancel.in_critical_window self_deploy probe error: %s", e)
|
||||
return True
|
||||
try:
|
||||
from . import merge_gate
|
||||
holder = merge_gate.current_lease_holder(repo)
|
||||
if holder and branch and holder == branch:
|
||||
return True
|
||||
except Exception as e: # noqa: BLE001 - fail-CLOSED on doubt
|
||||
logger.warning("cancel.in_critical_window merge-lease probe error: %s", e)
|
||||
return True
|
||||
return False
|
||||
|
||||
|
||||
def snapshot() -> dict:
|
||||
"""Read-only STOP-cancellation summary for GET /queue (AC-10).
|
||||
|
||||
Additive block; existing /queue keys are untouched. never-raise -> a minimal
|
||||
dict with the flags on error.
|
||||
"""
|
||||
try:
|
||||
enabled = bool(getattr(settings, "stop_status_enabled", False))
|
||||
except Exception: # noqa: BLE001
|
||||
enabled = False
|
||||
try:
|
||||
repos_cfg = getattr(settings, "stop_status_repos", "") or ""
|
||||
except Exception: # noqa: BLE001
|
||||
repos_cfg = ""
|
||||
try:
|
||||
from . import db
|
||||
stats = db.cancelled_tasks_snapshot(10)
|
||||
except Exception as e: # noqa: BLE001 - never-raise
|
||||
logger.warning("cancel.snapshot error: %s", e)
|
||||
stats = {"count": 0, "pending": 0, "recent": []}
|
||||
return {
|
||||
"enabled": enabled,
|
||||
"repos": repos_cfg,
|
||||
"cancelled_count": stats.get("count", 0),
|
||||
"deferred_pending": stats.get("pending", 0),
|
||||
"recent": stats.get("recent", []),
|
||||
}
|
||||
@@ -605,6 +605,25 @@ class Settings(BaseSettings):
|
||||
serial_gate_repos: str = ""
|
||||
serial_gate_freeze_enabled: bool = True
|
||||
|
||||
# ORCH-090: STOP-status task cancellation (stop active agent + full progress
|
||||
# reset) and the relaunch-hole close. A new logical Plane key `stop` (fail-closed,
|
||||
# absent from _DEFAULT_STATES) routes to a cancel handler that drives the task to
|
||||
# the new system-terminal state `cancelled` (stage + durable). Additive,
|
||||
# never-raise, restart-safe; STAGE_TRANSITIONS / QG_CHECKS / check_* / existing
|
||||
# status semantics are NOT touched. See
|
||||
# docs/work-items/ORCH-090/06-adr/ADR-001-stop-cancel-task.md and the cross-cutting
|
||||
# docs/architecture/adr/adr-0026-stop-cancel-task.md.
|
||||
# stop_status_enabled -> kill-switch (env ORCH_STOP_STATUS_ENABLED). False ->
|
||||
# STOP handling AND the relaunch-hole gate are inert
|
||||
# (behaviour strictly as before ORCH-090 — zero
|
||||
# regression, AC-8).
|
||||
# stop_status_repos -> CSV scope (env ORCH_STOP_STATUS_REPOS). Empty -> applies
|
||||
# to ALL repos (cancellation is meaningful for enduro too);
|
||||
# non-empty -> only the listed repos. Tokens are sanitised
|
||||
# (^[A-Za-z0-9._-]+$) by the cancel leaf.
|
||||
stop_status_enabled: bool = True
|
||||
stop_status_repos: str = ""
|
||||
|
||||
# ORCH-073 (ADR-001 Р-4): main-integrity regression guard. After the merge-verify
|
||||
# under-gate confirms the deployed SHA is an ancestor of origin/main (FR-1), a
|
||||
# secondary deterministic (no-LLM) guard checks that a declarative set of markers
|
||||
|
||||
208
src/db.py
208
src/db.py
@@ -59,7 +59,7 @@ def init_db():
|
||||
repo TEXT NOT NULL,
|
||||
task_id INTEGER, -- FK tasks.id (nullable)
|
||||
task_content TEXT, -- written to the agent task_file
|
||||
status TEXT NOT NULL DEFAULT 'queued', -- queued|running|done|failed
|
||||
status TEXT NOT NULL DEFAULT 'queued', -- queued|running|done|failed|cancelled (ORCH-090: cancelled is a terminal outcome, never requeued)
|
||||
attempts INTEGER NOT NULL DEFAULT 0,
|
||||
max_attempts INTEGER NOT NULL DEFAULT 2,
|
||||
run_id INTEGER, -- agent_runs.id once started
|
||||
@@ -129,6 +129,17 @@ def init_db():
|
||||
# tracker can show "твоё время" without recomputing from activity history.
|
||||
_ensure_column(conn, "tasks", "brd_review_started_at", "TEXT")
|
||||
_ensure_column(conn, "tasks", "brd_review_ended_at", "TEXT")
|
||||
# ORCH-090 (08-data-requirements.md): STOP-cancellation durable markers. Both are
|
||||
# additive, idempotent (_ensure_column is a no-op once present) -> safe on the live
|
||||
# shared prod DB (enduro untouched). The durable terminal itself is tasks.stage=
|
||||
# 'cancelled' (already understood by the reconciler terminal-skip); these columns
|
||||
# are audit/observability + the deferred-cancel signal.
|
||||
# cancelled_at -> timestamp the task was cancelled (NULL otherwise).
|
||||
# cancel_requested_at -> STOP arrived inside a critical merge/deploy window
|
||||
# (ADR-001 D7): cancellation is DEFERRED until the
|
||||
# irreversible step finishes honestly, then applied.
|
||||
_ensure_column(conn, "tasks", "cancelled_at", "TEXT")
|
||||
_ensure_column(conn, "tasks", "cancel_requested_at", "TEXT")
|
||||
# ORCH-026 (Level B): declarative task dependencies. job_deps stores the
|
||||
# directed edge "task_id (B) is blocked-by depends_on_task_id (A)". The
|
||||
# scheduler gate in claim_next_job keeps B queued until every A reaches
|
||||
@@ -231,6 +242,13 @@ def get_active_tasks_for_reconcile() -> list[dict]:
|
||||
``age_s`` = seconds since ``tasks.updated_at`` (computed in SQL against UTC
|
||||
'now', matching how ``update_task_stage`` stamps ``updated_at``). The
|
||||
reconciler applies the per-stage grace and active-job guard on top.
|
||||
|
||||
ORCH-090 (adr-0026): a ``cancelled`` task is DELIBERATELY still returned here
|
||||
and skipped by the reconciler's own terminal-skip (``stage in
|
||||
('done','cancelled')``, ORCH-086 D2) — narrowing the query to exclude
|
||||
``cancelled`` would lose the observability skip-counter increment that ORCH-086
|
||||
relies on. The terminal set is harmonised in the *scheduler* predicates
|
||||
(serial_gate / task_deps), not here.
|
||||
"""
|
||||
conn = get_db()
|
||||
try:
|
||||
@@ -605,7 +623,9 @@ def claim_next_job() -> dict | None:
|
||||
dep_gate = (
|
||||
"AND NOT EXISTS ("
|
||||
" SELECT 1 FROM job_deps d JOIN tasks t ON t.id = d.depends_on_task_id "
|
||||
" WHERE d.task_id = jobs.task_id AND t.stage != 'done'"
|
||||
# ORCH-090 (adr-0026): a cancelled predecessor is TERMINAL -> the
|
||||
# dependent must NOT wait on it forever. Terminal set = {done,cancelled}.
|
||||
" WHERE d.task_id = jobs.task_id AND t.stage NOT IN ('done','cancelled')"
|
||||
") "
|
||||
)
|
||||
# ORCH-088 (FR-1, ADR-001 D1): per-repo serial gate. An analyst-job of a NEW
|
||||
@@ -683,11 +703,11 @@ def mark_job(
|
||||
run_id: int | None = None,
|
||||
error: str | None = None,
|
||||
):
|
||||
"""Update a job's status (queued|running|done|failed).
|
||||
"""Update a job's status (queued|running|done|failed|cancelled).
|
||||
|
||||
- run_id (optional): link to the agent_runs row that executed this job.
|
||||
- error (optional): last error message (for failed/retry).
|
||||
- 'done'/'failed' also stamp finished_at.
|
||||
- 'done'/'failed'/'cancelled' (ORCH-090) also stamp finished_at.
|
||||
- 'queued' (requeue for retry) clears started_at/finished_at so the next
|
||||
claim treats it as fresh.
|
||||
"""
|
||||
@@ -700,7 +720,7 @@ def mark_job(
|
||||
if error is not None:
|
||||
sets.append("error = ?")
|
||||
params.append(error)
|
||||
if status in ("done", "failed"):
|
||||
if status in ("done", "failed", "cancelled"):
|
||||
sets.append("finished_at = datetime('now')")
|
||||
elif status == "queued":
|
||||
sets.append("started_at = NULL")
|
||||
@@ -728,6 +748,178 @@ def has_active_job_for_task(task_id: int) -> bool:
|
||||
return row is not None
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# ORCH-090: STOP-cancellation helpers (task + jobs terminal state)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def get_task(task_id: int) -> dict | None:
|
||||
"""Fetch a single task row by id (None when absent)."""
|
||||
conn = get_db()
|
||||
try:
|
||||
row = conn.execute("SELECT * FROM tasks WHERE id = ?", (task_id,)).fetchone()
|
||||
finally:
|
||||
conn.close()
|
||||
return dict(row) if row else None
|
||||
|
||||
|
||||
def get_active_jobs_for_task(task_id: int) -> list[dict]:
|
||||
"""ORCH-090: queued/running jobs of a task (for STOP — stop agent + cancel).
|
||||
|
||||
Returns the full job rows (incl. ``pid`` / ``run_id`` / ``status``) so the
|
||||
cancel orchestrator can SIGTERM the running agent by ``jobs.pid`` and then flip
|
||||
every job to the terminal ``cancelled`` outcome.
|
||||
"""
|
||||
conn = get_db()
|
||||
try:
|
||||
rows = conn.execute(
|
||||
"SELECT * FROM jobs WHERE task_id = ? AND status IN ('queued','running') "
|
||||
"ORDER BY id",
|
||||
(task_id,),
|
||||
).fetchall()
|
||||
finally:
|
||||
conn.close()
|
||||
return [dict(r) for r in rows]
|
||||
|
||||
|
||||
def cancel_jobs_for_task(task_id: int, only_queued: bool = False) -> int:
|
||||
"""ORCH-090 (ADR-001 D3): flip a task's jobs to the terminal ``cancelled`` outcome.
|
||||
|
||||
Guarded UPDATE over ``status IN ('queued','running')`` (or only ``'queued'`` when
|
||||
``only_queued`` — the deferred-cancel path inside a critical merge/deploy window,
|
||||
D7, which must NOT cancel the still-running deploy/merge actor). ``cancelled`` is
|
||||
never requeued: ``claim_next_job`` only selects ``status='queued'`` and the reaper
|
||||
/ worker check the task's terminal stage before any requeue. Returns the number of
|
||||
jobs cancelled. never-raise -> 0 on error.
|
||||
"""
|
||||
statuses = "('queued')" if only_queued else "('queued','running')"
|
||||
try:
|
||||
conn = get_db()
|
||||
try:
|
||||
cur = conn.execute(
|
||||
f"UPDATE jobs SET status='cancelled', finished_at=datetime('now') "
|
||||
f"WHERE task_id = ? AND status IN {statuses}",
|
||||
(task_id,),
|
||||
)
|
||||
conn.commit()
|
||||
return cur.rowcount or 0
|
||||
finally:
|
||||
conn.close()
|
||||
except Exception:
|
||||
return 0
|
||||
|
||||
|
||||
def mark_task_cancelled(task_id: int) -> bool:
|
||||
"""ORCH-090 (ADR-001 D4): durable terminal + natural-key tombstone for a task.
|
||||
|
||||
Atomically (single UPDATE):
|
||||
* ``stage='cancelled'`` (durable terminal, understood by the reconciler skip);
|
||||
* ``cancelled_at=now``, ``cancel_requested_at=NULL`` (clear any deferred flag);
|
||||
* TOMBSTONE the natural keys so a later "To Analyse" re-creates the task FROM
|
||||
SCRATCH: ``plane_id`` / ``work_item_id`` / ``plane_issue_id`` get a
|
||||
deterministic ``#cancelled-<id>`` suffix -> ``get_task_by_plane_id`` returns
|
||||
None and the anti-dup / uniqueness guards no longer collide. The row is NOT
|
||||
deleted (durable audit).
|
||||
|
||||
ADR-001 D4 refinement (ORCH-090): the ADR proposed keeping ``plane_issue_id``
|
||||
untouched for audit, but ``get_task_by_plane_id`` / ``create_task_atomic`` match
|
||||
on ``plane_id OR plane_issue_id`` — leaving ``plane_issue_id`` matchable would
|
||||
keep the cancelled row "findable" and BLOCK the clean-slate re-create (BR-3 /
|
||||
TR-4). We therefore suffix it too; the ``#cancelled-<id>`` tag is deterministic
|
||||
and parseable, so the original Plane issue UUID (== the original ``plane_id`` in
|
||||
every create path) is still fully recoverable for audit.
|
||||
|
||||
Idempotent-safe: the suffix is only appended when not already present (a repeat
|
||||
STOP on an already-cancelled row does not double-suffix). Returns True iff the
|
||||
row was updated. never-raise -> False on error.
|
||||
"""
|
||||
try:
|
||||
conn = get_db()
|
||||
try:
|
||||
row = conn.execute(
|
||||
"SELECT plane_id, work_item_id, plane_issue_id FROM tasks WHERE id = ?",
|
||||
(task_id,),
|
||||
).fetchone()
|
||||
if not row:
|
||||
return False
|
||||
suffix = f"#cancelled-{task_id}"
|
||||
|
||||
def _tomb(v):
|
||||
v = v or ""
|
||||
return v if suffix in v else f"{v}{suffix}"
|
||||
|
||||
plane_id = _tomb(row["plane_id"])
|
||||
work_item_id = _tomb(row["work_item_id"])
|
||||
plane_issue_id = _tomb(row["plane_issue_id"])
|
||||
conn.execute(
|
||||
"UPDATE tasks SET stage='cancelled', cancelled_at=datetime('now'), "
|
||||
"cancel_requested_at=NULL, plane_id=?, work_item_id=?, plane_issue_id=?, "
|
||||
"updated_at=datetime('now') WHERE id = ?",
|
||||
(plane_id, work_item_id, plane_issue_id, task_id),
|
||||
)
|
||||
conn.commit()
|
||||
return True
|
||||
finally:
|
||||
conn.close()
|
||||
except Exception:
|
||||
return False
|
||||
|
||||
|
||||
def set_task_cancel_requested(task_id: int) -> bool:
|
||||
"""ORCH-090 (ADR-001 D7): mark a deferred cancellation (STOP in critical window).
|
||||
|
||||
Idempotent: only stamps ``cancel_requested_at`` the first time. The deterministic
|
||||
deploy/merge finalizer reads it once the irreversible step completes and then
|
||||
applies the full cancellation. never-raise -> False on error.
|
||||
"""
|
||||
try:
|
||||
conn = get_db()
|
||||
try:
|
||||
conn.execute(
|
||||
"UPDATE tasks SET cancel_requested_at=datetime('now') "
|
||||
"WHERE id = ? AND cancel_requested_at IS NULL",
|
||||
(task_id,),
|
||||
)
|
||||
conn.commit()
|
||||
return True
|
||||
finally:
|
||||
conn.close()
|
||||
except Exception:
|
||||
return False
|
||||
|
||||
|
||||
def cancelled_tasks_snapshot(limit: int = 10) -> dict:
|
||||
"""ORCH-090 (AC-10): read-only cancellation summary for GET /queue.
|
||||
|
||||
Returns ``{count, pending, recent}`` where ``count`` is the number of cancelled
|
||||
tasks, ``pending`` the number with a deferred (not-yet-applied) cancellation, and
|
||||
``recent`` the last ``limit`` cancelled tasks. never-raise -> minimal dict.
|
||||
"""
|
||||
try:
|
||||
conn = get_db()
|
||||
try:
|
||||
count = conn.execute(
|
||||
"SELECT COUNT(*) FROM tasks WHERE stage='cancelled'"
|
||||
).fetchone()[0]
|
||||
pending = conn.execute(
|
||||
"SELECT COUNT(*) FROM tasks WHERE cancel_requested_at IS NOT NULL "
|
||||
"AND stage != 'cancelled'"
|
||||
).fetchone()[0]
|
||||
recent = [
|
||||
{"work_item_id": r["work_item_id"], "repo": r["repo"],
|
||||
"cancelled_at": r["cancelled_at"]}
|
||||
for r in conn.execute(
|
||||
"SELECT work_item_id, repo, cancelled_at FROM tasks "
|
||||
"WHERE stage='cancelled' ORDER BY cancelled_at DESC LIMIT ?",
|
||||
(limit,),
|
||||
).fetchall()
|
||||
]
|
||||
finally:
|
||||
conn.close()
|
||||
return {"count": int(count), "pending": int(pending), "recent": recent}
|
||||
except Exception:
|
||||
return {"count": 0, "pending": 0, "recent": []}
|
||||
|
||||
|
||||
def count_running_jobs() -> int:
|
||||
"""Number of jobs currently in 'running' status (for max_concurrency)."""
|
||||
conn = get_db()
|
||||
@@ -815,7 +1007,7 @@ def reap_running_job(
|
||||
if error is not None:
|
||||
sets.append("error = ?")
|
||||
params.append(error)
|
||||
if status in ("done", "failed"):
|
||||
if status in ("done", "failed", "cancelled"): # ORCH-090: cancelled is terminal
|
||||
sets.append("finished_at = datetime('now')")
|
||||
elif status == "queued":
|
||||
sets.append("started_at = NULL")
|
||||
@@ -948,7 +1140,9 @@ def get_unfinished_dependencies(task_id: int) -> list[dict]:
|
||||
rows = conn.execute(
|
||||
"SELECT t.id AS id, t.work_item_id AS work_item_id, t.stage AS stage "
|
||||
"FROM job_deps d JOIN tasks t ON t.id = d.depends_on_task_id "
|
||||
"WHERE d.task_id = ? AND t.stage != 'done'",
|
||||
# ORCH-090 (adr-0026): {done,cancelled} are both terminal -> a
|
||||
# cancelled predecessor no longer blocks the dependent.
|
||||
"WHERE d.task_id = ? AND t.stage NOT IN ('done','cancelled')",
|
||||
(task_id,),
|
||||
).fetchall()
|
||||
finally:
|
||||
|
||||
65
src/gitea.py
Normal file
65
src/gitea.py
Normal file
@@ -0,0 +1,65 @@
|
||||
"""ORCH-090 (ADR-001 D8 / adr-0026): minimal Gitea branch helpers.
|
||||
|
||||
Leaf module — a single never-raise helper used by the STOP-cancellation path to
|
||||
delete a cancelled task's REMOTE feature branch. Deliberately tiny and dependency
|
||||
-light (only ``config`` + ``httpx``) so it can be imported from the stage engine
|
||||
without cycles.
|
||||
|
||||
Self-hosting safety (NFR-3): this helper deletes ONLY the named feature branch
|
||||
via the Gitea API. It NEVER touches ``main`` (a guard rejects it outright) and
|
||||
NEVER force-pushes — there is no push path here at all.
|
||||
"""
|
||||
import logging
|
||||
|
||||
import httpx
|
||||
|
||||
from .config import settings
|
||||
|
||||
logger = logging.getLogger("orchestrator.gitea")
|
||||
|
||||
# Branches that must never be deleted by an automated cancel (self-hosting safety).
|
||||
_PROTECTED_BRANCHES = {"main", "master"}
|
||||
|
||||
|
||||
def delete_remote_branch(repo: str, branch: str) -> bool:
|
||||
"""Delete a remote feature branch in Gitea (never-raise).
|
||||
|
||||
``DELETE /api/v1/repos/{owner}/{repo}/branches/{branch}``. Used by
|
||||
``stage_engine.cancel_task`` to reset a cancelled task's progress (D8). A 404
|
||||
(branch already gone) is treated as success — the goal state (branch absent) is
|
||||
reached. Returns True iff the branch is confirmed absent after the call.
|
||||
|
||||
Guards:
|
||||
* empty repo/branch -> no-op (False);
|
||||
* a protected branch (``main``/``master``) -> refused with an error log
|
||||
(NFR-3: STOP must never delete ``main``).
|
||||
Any network/API error is logged and swallowed (the worktree is cleaned locally
|
||||
regardless); returns False so the caller can note a best-effort miss.
|
||||
"""
|
||||
if not repo or not branch:
|
||||
return False
|
||||
if branch.strip().lower() in _PROTECTED_BRANCHES:
|
||||
logger.error(
|
||||
"delete_remote_branch REFUSED for protected branch %r in %s (self-hosting safety)",
|
||||
branch, repo,
|
||||
)
|
||||
return False
|
||||
owner = settings.gitea_owner
|
||||
url = f"{settings.gitea_url}/api/v1/repos/{owner}/{repo}/branches/{branch}"
|
||||
headers = {"Authorization": f"token {settings.gitea_token}"}
|
||||
try:
|
||||
resp = httpx.delete(url, headers=headers, timeout=10)
|
||||
if resp.status_code in (204, 200):
|
||||
logger.info("Deleted remote branch %s in %s/%s", branch, owner, repo)
|
||||
return True
|
||||
if resp.status_code == 404:
|
||||
logger.info("Remote branch %s already absent in %s/%s", branch, owner, repo)
|
||||
return True
|
||||
logger.warning(
|
||||
"delete_remote_branch %s in %s/%s returned %s: %s",
|
||||
branch, owner, repo, resp.status_code, resp.text[:200],
|
||||
)
|
||||
return False
|
||||
except Exception as e: # noqa: BLE001 - never-raise
|
||||
logger.warning("delete_remote_branch error for %s/%s/%s: %s", owner, repo, branch, e)
|
||||
return False
|
||||
@@ -325,6 +325,16 @@ class JobReaper:
|
||||
attempts = int(job.get("attempts") or 0)
|
||||
max_attempts = int(job.get("max_attempts") or 2)
|
||||
err = f"reaped: {reason} (run_id={run_id})"
|
||||
# ORCH-090 (adr-0026 / TR-2): the source of truth for "do not revive" is the
|
||||
# task's TERMINAL stage, not the job status. If the task is already terminal
|
||||
# ({done,cancelled}) — e.g. STOP flipped it to 'cancelled' while this job was
|
||||
# still 'running' (dead pid) — flip the job to the terminal 'cancelled'
|
||||
# outcome instead of requeueing it (closes the SIGTERM/reaper requeue race).
|
||||
_branch, _stage, _wid = self._task_meta(job)
|
||||
if _stage in ("done", "cancelled"):
|
||||
if reap_running_job(job_id, "cancelled", run_id=run_id, error=err):
|
||||
self._note_reap(job, "cancelled", reason=f"{reason} (task terminal={_stage})")
|
||||
return
|
||||
if attempts < max_attempts:
|
||||
if reap_running_job(job_id, "queued", run_id=run_id, error=err):
|
||||
self._note_reap(job, "queued", reason=reason)
|
||||
|
||||
@@ -171,6 +171,7 @@ async def queue():
|
||||
from . import task_deps
|
||||
from . import serial_gate
|
||||
from . import labels
|
||||
from . import cancel
|
||||
from .disk_watchdog import disk_watchdog
|
||||
from .build_cache_pruner import build_cache_pruner
|
||||
return {
|
||||
@@ -191,6 +192,10 @@ async def queue():
|
||||
# ORCH-089 (D7): auto-mode-by-label observability (read-only) — kill-switch,
|
||||
# label names, scope. Additive block.
|
||||
"auto_labels": labels.snapshot(),
|
||||
# ORCH-090 (AC-10): STOP-cancellation observability (read-only) — kill-switch,
|
||||
# repo scope, cancelled/deferred counts, recent cancellations. Additive block;
|
||||
# never-raise.
|
||||
"stop": cancel.snapshot(),
|
||||
# ORCH-063 (FR-6 / AC-7): disk-watchdog observability (read-only) —
|
||||
# enabled, threshold, interval, last measurement per host-path. Additive
|
||||
# block; never-raise (status() returns {"enabled": ...} minimum on error).
|
||||
|
||||
@@ -340,6 +340,21 @@ def release_merge_lease(repo: str, branch: str | None = None) -> None:
|
||||
logger.warning("merge-lease release error for %s: %s", repo, e)
|
||||
|
||||
|
||||
def current_lease_holder(repo: str) -> str | None:
|
||||
"""ORCH-090: branch currently holding the per-repo merge-lease, or None.
|
||||
|
||||
Read-only helper used by ``cancel.in_critical_window`` to decide whether a STOP
|
||||
must be DEFERRED (the task is mid-merge). Never raises -> None on missing/corrupt
|
||||
lease or any error (the caller treats an error as fail-CLOSED itself).
|
||||
"""
|
||||
try:
|
||||
existing = _read_lease(_lease_path(repo))
|
||||
return existing.get("branch") if existing else None
|
||||
except Exception as e: # noqa: BLE001 - never-raise
|
||||
logger.warning("current_lease_holder error for %s: %s", repo, e)
|
||||
return None
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# ORCH-065: proactive stale/dead merge-lease reclaim (Problem B)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
@@ -148,6 +148,13 @@ _PLANE_NAME_TO_KEY: dict[str, str] = {
|
||||
# this board status (enduro / API fallback) fail-closed — no UUID, no
|
||||
# confirm-deploy branch, no KeyError (accessed via .get).
|
||||
"Confirm Deploy": "confirm_deploy",
|
||||
# ORCH-090: dedicated operator "STOP" status — the cancel trigger. Like
|
||||
# ORCH-059's Confirm Deploy it is INTENTIONALLY ABSENT from _DEFAULT_STATES
|
||||
# (fail-closed): environments without the status (enduro / API fallback)
|
||||
# resolve `stop` to None via .get -> the cancel branch simply never activates
|
||||
# (no UUID, no KeyError, no blind cancel). Create a STOP status with the
|
||||
# `cancelled` group on the board to enable it (07-infra-requirements.md).
|
||||
"STOP": "stop",
|
||||
# ORCH-066: meaningful per-stage / human-input statuses (layer B).
|
||||
"To Analyse": "to_analyse",
|
||||
"Analysis": "analysis",
|
||||
|
||||
@@ -187,12 +187,18 @@ class QueueWorker:
|
||||
# launch error so the job does not wedge as 'running' forever.
|
||||
logger.error(f"Worker failed to launch job {job['id']}: {e}")
|
||||
try:
|
||||
from .db import get_job, mark_job
|
||||
from .db import get_job, mark_job, get_task
|
||||
|
||||
j = get_job(job["id"])
|
||||
attempts = j.get("attempts", 0) if j else 0
|
||||
max_attempts = j.get("max_attempts", 2) if j else 2
|
||||
if attempts < max_attempts:
|
||||
# ORCH-090 (adr-0026 / TR-2): never requeue a job whose task is
|
||||
# already terminal ({done,cancelled}) — a STOP that landed between
|
||||
# claim and launch must win over the retry budget.
|
||||
task = get_task(job.get("task_id")) if job.get("task_id") else None
|
||||
if task and task.get("stage") in ("done", "cancelled"):
|
||||
mark_job(job["id"], "cancelled", error=f"launch error (task terminal): {e}")
|
||||
elif attempts < max_attempts:
|
||||
mark_job(job["id"], "queued", error=f"launch error: {e}")
|
||||
else:
|
||||
mark_job(job["id"], "failed", error=f"launch error: {e}")
|
||||
|
||||
@@ -110,14 +110,19 @@ def repo_has_active_task(repo: str, exclude_task_id: int | None = None) -> bool:
|
||||
try:
|
||||
conn = db.get_db()
|
||||
try:
|
||||
# ORCH-090 (adr-0026): terminal set is {done,cancelled}. A cancelled
|
||||
# task must NOT count as "active" or it would block the repo's serial
|
||||
# gate forever.
|
||||
if exclude_task_id is not None:
|
||||
row = conn.execute(
|
||||
"SELECT 1 FROM tasks WHERE repo=? AND id != ? AND stage != 'done' LIMIT 1",
|
||||
"SELECT 1 FROM tasks WHERE repo=? AND id != ? "
|
||||
"AND stage NOT IN ('done','cancelled') LIMIT 1",
|
||||
(repo, exclude_task_id),
|
||||
).fetchone()
|
||||
else:
|
||||
row = conn.execute(
|
||||
"SELECT 1 FROM tasks WHERE repo=? AND stage != 'done' LIMIT 1",
|
||||
"SELECT 1 FROM tasks WHERE repo=? "
|
||||
"AND stage NOT IN ('done','cancelled') LIMIT 1",
|
||||
(repo,),
|
||||
).fetchone()
|
||||
return row is not None
|
||||
@@ -264,10 +269,12 @@ def build_claim_clause() -> str:
|
||||
repo_scope = f"AND jobs.repo IN ({repo_in}) "
|
||||
else:
|
||||
repo_scope = ""
|
||||
# ORCH-090 (adr-0026): {done,cancelled} are both terminal — an EARLIER
|
||||
# cancelled task no longer holds the FIFO serial gate closed.
|
||||
active_clause = (
|
||||
"EXISTS (SELECT 1 FROM tasks t2 "
|
||||
"WHERE t2.repo = jobs.repo AND t2.id < jobs.task_id "
|
||||
"AND t2.stage != 'done') "
|
||||
"AND t2.stage NOT IN ('done','cancelled')) "
|
||||
)
|
||||
if _freeze_layer_enabled():
|
||||
freeze_clause = (
|
||||
@@ -329,9 +336,10 @@ def _per_repo_snapshot(repo: str) -> dict:
|
||||
try:
|
||||
conn = db.get_db()
|
||||
try:
|
||||
# ORCH-090 (adr-0026): terminal set {done,cancelled}.
|
||||
row = conn.execute(
|
||||
"SELECT work_item_id, stage FROM tasks "
|
||||
"WHERE repo=? AND stage != 'done' ORDER BY id LIMIT 1",
|
||||
"WHERE repo=? AND stage NOT IN ('done','cancelled') ORDER BY id LIMIT 1",
|
||||
(repo,),
|
||||
).fetchone()
|
||||
if row:
|
||||
|
||||
@@ -1656,6 +1656,28 @@ def run_deploy_finalizer(job: dict):
|
||||
finished_agent="deployer",
|
||||
)
|
||||
|
||||
# ORCH-090 (ADR-001 D7 / AC-7): a STOP that arrived during the prod deploy was
|
||||
# DEFERRED (cancel_requested_at). The irreversible step has now finished honestly
|
||||
# above, so apply the deferred cancellation. force=True bypasses ONLY the
|
||||
# critical-window guard (the INITIATED marker may still linger) — a task that
|
||||
# reached terminal 'done' (SUCCESS) is an honest no-op (code is already in prod);
|
||||
# a FAILED deploy rolled back to development is fully reset now.
|
||||
try:
|
||||
from .db import get_task as _get_task
|
||||
t = _get_task(task_id)
|
||||
if t and t.get("cancel_requested_at") and t.get("stage") != "cancelled":
|
||||
logger.warning(
|
||||
"Task %s: applying deferred STOP after deploy finalize", task_id
|
||||
)
|
||||
cancel_task(
|
||||
task_id,
|
||||
reason="deferred STOP applied after deploy finalize",
|
||||
source="deferred",
|
||||
force=True,
|
||||
)
|
||||
except Exception as e: # noqa: BLE001 - never break the finalizer
|
||||
logger.warning("Task %s: deferred-cancel application failed: %s", task_id, e)
|
||||
|
||||
|
||||
def run_post_deploy_monitor(job: dict):
|
||||
"""ORCH-021 — one post-deploy monitor tick (reserved-agent, no LLM).
|
||||
@@ -1825,3 +1847,182 @@ def _notify_post_deploy(work_item_id: str, message: str) -> None:
|
||||
plane_add_comment(work_item_id, message, author="deployer")
|
||||
except Exception as e: # noqa: BLE001 - never break the tick
|
||||
logger.warning(f"post-deploy notify plane failed for {work_item_id}: {e}")
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# ORCH-090 (ADR-001 / adr-0026): STOP-cancellation orchestration
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def cancel_task(
|
||||
task_id: int,
|
||||
*,
|
||||
reason: str = "",
|
||||
source: str = "stop",
|
||||
force: bool = False,
|
||||
) -> dict:
|
||||
"""Cancel a task: stop the active agent + full progress reset (ORCH-090).
|
||||
|
||||
The single orchestration point behind the Plane STOP status (``webhooks/plane.
|
||||
handle_stop``). Drives the task to the system-terminal state ``cancelled``:
|
||||
|
||||
1. **Idempotency (BR-5 / AC-6):** an absent task or one already terminal
|
||||
(``stage in {done,cancelled}``) is a no-op — no re-kill, no re-cleanup, no
|
||||
duplicate notification.
|
||||
2. **Critical window (ADR-001 D7 / AC-7):** if the task is mid merge/deploy
|
||||
(``cancel.in_critical_window``) and not ``force``, the cancellation is
|
||||
DEFERRED: stamp ``cancel_requested_at``, cancel ONLY queued jobs (never the
|
||||
running deploy/merge actor), alert, and return — the deterministic deploy
|
||||
finalizer applies the cancel once the irreversible step finishes honestly.
|
||||
STOP NEVER touches ``main`` / force-pushes / restarts the prod container.
|
||||
3. **Full reset:** SIGTERM the running agent through the graceful cascade
|
||||
(``launcher.stop_process``), cancel all jobs (terminal ``cancelled``),
|
||||
clear deploy-state + release a held merge-lease (best-effort), remove the
|
||||
worktree, delete the remote feature branch, then tombstone the natural keys
|
||||
+ flip ``stage='cancelled'`` (durable). Docs artefacts are NOT touched.
|
||||
4. **Observability (AC-10):** log + Telegram + Plane comment + tracker update.
|
||||
|
||||
``force=True`` bypasses ONLY the critical-window guard (used by the deploy
|
||||
finalizer to apply a deferred cancel after the step completes) — it never
|
||||
overrides the terminal-stage idempotency. Returns a small result dict for
|
||||
tests/observability. never-raise: any error is logged; a notify failure never
|
||||
aborts the cancellation.
|
||||
"""
|
||||
from .db import (
|
||||
get_task, get_active_jobs_for_task, cancel_jobs_for_task,
|
||||
mark_task_cancelled, set_task_cancel_requested,
|
||||
)
|
||||
from . import cancel as cancel_mod
|
||||
|
||||
result = {"ok": False, "task_id": task_id, "deferred": False,
|
||||
"stopped": 0, "cancelled_jobs": 0, "note": None}
|
||||
|
||||
task = get_task(task_id)
|
||||
if not task:
|
||||
result["note"] = "no-task"
|
||||
logger.info("cancel_task: no task row for task_id=%s", task_id)
|
||||
return result
|
||||
|
||||
stage = task.get("stage")
|
||||
repo = task.get("repo")
|
||||
branch = task.get("branch") or ""
|
||||
work_item_id = task.get("work_item_id") or ""
|
||||
|
||||
# (1) Idempotency: already terminal -> no-op.
|
||||
if stage in ("done", "cancelled"):
|
||||
result["ok"] = True
|
||||
result["note"] = f"already-terminal:{stage}"
|
||||
logger.info(
|
||||
"cancel_task: task %s (%s) already terminal (stage=%s) -> no-op",
|
||||
task_id, work_item_id, stage,
|
||||
)
|
||||
return result
|
||||
|
||||
# (2) Critical merge/deploy window -> DEFER (unless forced by the finalizer).
|
||||
if not force and cancel_mod.in_critical_window(task):
|
||||
set_task_cancel_requested(task_id)
|
||||
result["cancelled_jobs"] = cancel_jobs_for_task(task_id, only_queued=True)
|
||||
result["deferred"] = True
|
||||
result["ok"] = True
|
||||
result["note"] = "deferred-critical-window"
|
||||
msg = (
|
||||
f"⏸️ {link_for(work_item_id)}: STOP получен во время "
|
||||
f"критичного шага (merge/deploy) — отмена ОТЛОЖЕНА до честного "
|
||||
f"завершения шага. main/прод не трогаются."
|
||||
)
|
||||
_notify_cancel(work_item_id, task_id, msg)
|
||||
logger.warning(
|
||||
"cancel_task: task %s (%s) in critical window -> deferred cancel "
|
||||
"(queued jobs cancelled=%s)", task_id, work_item_id, result["cancelled_jobs"],
|
||||
)
|
||||
return result
|
||||
|
||||
# (3) Full reset ----------------------------------------------------------
|
||||
# 3a. Stop the active agent through the graceful cascade (AC-1). Capture the
|
||||
# running jobs BEFORE cancelling them so we still know their pids.
|
||||
stopped = 0
|
||||
try:
|
||||
from .agents.launcher import launcher
|
||||
for job in get_active_jobs_for_task(task_id):
|
||||
if job.get("status") == "running" and job.get("pid"):
|
||||
try:
|
||||
if launcher.stop_process(
|
||||
job["pid"], job.get("run_id"), reason=f"STOP cancel task {task_id}"
|
||||
):
|
||||
stopped += 1
|
||||
except Exception as e: # noqa: BLE001 - never-raise
|
||||
logger.warning("cancel_task: stop_process failed for job %s: %s",
|
||||
job.get("id"), e)
|
||||
except Exception as e: # noqa: BLE001 - never-raise
|
||||
logger.warning("cancel_task: agent-stop step failed for task %s: %s", task_id, e)
|
||||
result["stopped"] = stopped
|
||||
|
||||
# 3b. Cancel ALL jobs (terminal 'cancelled', never requeued).
|
||||
result["cancelled_jobs"] = cancel_jobs_for_task(task_id)
|
||||
|
||||
# 3c. Clear deploy-state sentinels + release a held merge-lease (best-effort).
|
||||
# Outside a critical window the task does not hold the lease / has no
|
||||
# INITIATED marker, but clearing is idempotent and harmless.
|
||||
try:
|
||||
self_deploy.clear_state(repo, work_item_id)
|
||||
except Exception as e: # noqa: BLE001
|
||||
logger.warning("cancel_task: clear deploy-state failed for %s: %s", work_item_id, e)
|
||||
try:
|
||||
merge_gate.release_merge_lease(repo, branch)
|
||||
except Exception as e: # noqa: BLE001
|
||||
logger.warning("cancel_task: merge-lease release failed for %s: %s", branch, e)
|
||||
|
||||
# 3d. Remove the worktree + delete the remote feature branch (never main).
|
||||
if branch:
|
||||
try:
|
||||
from .git_worktree import remove_worktree
|
||||
remove_worktree(repo, branch)
|
||||
except Exception as e: # noqa: BLE001 - never-raise
|
||||
logger.warning("cancel_task: remove_worktree failed for %s/%s: %s",
|
||||
repo, branch, e)
|
||||
try:
|
||||
from . import gitea
|
||||
gitea.delete_remote_branch(repo, branch)
|
||||
except Exception as e: # noqa: BLE001 - never-raise
|
||||
logger.warning("cancel_task: delete_remote_branch failed for %s/%s: %s",
|
||||
repo, branch, e)
|
||||
|
||||
# 3e. Durable terminal + natural-key tombstone (docs artefacts untouched).
|
||||
mark_task_cancelled(task_id)
|
||||
|
||||
# (4) Observability.
|
||||
note = f" ({reason})" if reason else ""
|
||||
msg = (
|
||||
f"\U0001f6d1 {link_for(work_item_id)}: задача ОТМЕНЕНА (STOP){note}. "
|
||||
f"Агент остановлен, job'ы сняты ({result['cancelled_jobs']}), ветка/worktree "
|
||||
f"удалены, прогресс сброшен. Docs сохранены. Перезапуск — только «To Analyse»."
|
||||
)
|
||||
_notify_cancel(work_item_id, task_id, msg)
|
||||
result["ok"] = True
|
||||
result["note"] = "cancelled" if not force else "cancelled-deferred-applied"
|
||||
logger.warning(
|
||||
"cancel_task: task %s (%s, repo=%s) CANCELLED (source=%s, force=%s): "
|
||||
"stopped=%s, cancelled_jobs=%s", task_id, work_item_id, repo, source, force,
|
||||
stopped, result["cancelled_jobs"],
|
||||
)
|
||||
return result
|
||||
|
||||
|
||||
def _notify_cancel(work_item_id: str, task_id: int, message: str) -> None:
|
||||
"""Best-effort Telegram + Plane comment + tracker update for a cancellation.
|
||||
|
||||
Never raises — a notification failure must not abort the cancel (ORCH-090 FR-8).
|
||||
"""
|
||||
try:
|
||||
send_telegram(message)
|
||||
except Exception as e: # noqa: BLE001
|
||||
logger.warning("cancel notify telegram failed for %s: %s", work_item_id, e)
|
||||
if work_item_id:
|
||||
try:
|
||||
plane_add_comment(work_item_id, message, author="deployer")
|
||||
except Exception as e: # noqa: BLE001
|
||||
logger.warning("cancel notify plane failed for %s: %s", work_item_id, e)
|
||||
try:
|
||||
from .notifications import update_task_tracker
|
||||
update_task_tracker(task_id)
|
||||
except Exception as e: # noqa: BLE001
|
||||
logger.warning("cancel notify tracker failed for task %s: %s", task_id, e)
|
||||
|
||||
@@ -19,6 +19,13 @@ STAGE_TRANSITIONS = {
|
||||
"deploy-staging": {"next": "deploy", "agent": "deployer", "qg": "check_staging_status"},
|
||||
"deploy": {"next": "done", "agent": None, "qg": "check_deploy_status"},
|
||||
"done": {"next": None, "agent": None, "qg": None},
|
||||
# ORCH-090 (adr-0026): system-terminal sink for a STOP-cancelled task. This is
|
||||
# NOT a new pipeline edge — no exit-gate of any edge changes — it only makes
|
||||
# get_next_stage('cancelled') correctly return None (parallel to 'done'). The
|
||||
# scheduler terminal predicate is `stage IN ('done','cancelled')`; the points
|
||||
# that recognise it carry the ORCH-090 marker (serial_gate / task_deps /
|
||||
# reconciler / job_reaper).
|
||||
"cancelled": {"next": None, "agent": None, "qg": None},
|
||||
}
|
||||
|
||||
|
||||
|
||||
@@ -37,9 +37,12 @@ def is_task_ready(task_id: int) -> tuple[bool, list[str]]:
|
||||
"""Return ``(ready, waiting_on)`` for a task.
|
||||
|
||||
``ready`` is True when the task has no declared dependency whose predecessor
|
||||
is still un-done (``tasks.stage != 'done'``). ``waiting_on`` is the list of
|
||||
predecessor work-item ids (e.g. ``["ORCH-010"]``) the task is still blocked
|
||||
by — used for the Telegram waiting-line / Plane visibility.
|
||||
is still un-done. ORCH-090 (adr-0026): the terminal set is
|
||||
``{done, cancelled}`` — a CANCELLED predecessor is terminal and no longer
|
||||
blocks the dependent (the actual SQL predicate lives in
|
||||
``db.get_unfinished_dependencies`` / ``db.claim_next_job``). ``waiting_on`` is
|
||||
the list of predecessor work-item ids (e.g. ``["ORCH-010"]``) the task is still
|
||||
blocked by — used for the Telegram waiting-line / Plane visibility.
|
||||
|
||||
never-raise: any error -> ``(True, [])`` (fail OPEN — consistent with the
|
||||
scheduler omitting the gate when the DB read fails; a transient error must
|
||||
|
||||
@@ -160,8 +160,15 @@ async def handle_issue_updated(data: dict, project_id: str = ""):
|
||||
# fallback) resolve to None, so the branch simply never activates (no KeyError,
|
||||
# no blind deploy). Checked before `approved` so the two gestures never alias.
|
||||
confirm_state = proj_states.get("confirm_deploy")
|
||||
# ORCH-090: dedicated operator STOP status -> cancel the task (stop agent + full
|
||||
# reset). fail-closed via .get (no UUID on a board without the status -> None ->
|
||||
# branch never activates, exactly like confirm_deploy). Checked FIRST so a STOP
|
||||
# is never aliased by to_analyse/approved/rejected.
|
||||
stop_state = proj_states.get("stop")
|
||||
# ORCH-066: start/resume trigger is `To Analyse` (human entry-point).
|
||||
if new_state == proj_states["to_analyse"]:
|
||||
if stop_state and new_state == stop_state:
|
||||
await handle_stop(data, project_id)
|
||||
elif new_state == proj_states["to_analyse"]:
|
||||
await handle_status_start(data, project_id)
|
||||
elif confirm_state and new_state == confirm_state:
|
||||
await handle_confirm_deploy(data, project_id)
|
||||
@@ -212,6 +219,44 @@ async def handle_confirm_deploy(data: dict, project_id: str = ""):
|
||||
)
|
||||
|
||||
|
||||
async def handle_stop(data: dict, project_id: str = ""):
|
||||
"""ORCH-090: a human flipped the issue to the dedicated STOP status — cancel
|
||||
the task (stop the active agent + full progress reset).
|
||||
|
||||
Resolves the task by plane_id and delegates to the unified
|
||||
``stage_engine.cancel_task`` (run off the event loop via asyncio.to_thread — it
|
||||
is synchronous and may sleep during the graceful SIGTERM cascade). Guards:
|
||||
* kill-switch / repo-scope via ``cancel.applies(repo)`` (False -> no-op-log);
|
||||
* idempotent — an absent / already-terminal task is a no-op inside cancel_task.
|
||||
Contract is never-raise (NFR-5): any error is logged, the webhook flow never
|
||||
crashes.
|
||||
"""
|
||||
import asyncio
|
||||
from .. import cancel
|
||||
from ..stage_engine import cancel_task
|
||||
|
||||
plane_id = str(data.get("id") or "")
|
||||
task = get_task_by_plane_id(plane_id)
|
||||
if not task:
|
||||
logger.info(f"STOP for {plane_id} but no task found, ignoring (no-op)")
|
||||
return
|
||||
|
||||
task_id = task["id"]
|
||||
repo = task.get("repo", "")
|
||||
if not cancel.applies(repo):
|
||||
logger.info(
|
||||
f"STOP for {plane_id} (task {task_id}, repo={repo}) but cancellation is "
|
||||
f"not applicable (kill-switch off / out of scope); no-op"
|
||||
)
|
||||
return
|
||||
|
||||
logger.info(f"Task {task_id}: STOP status -> cancelling (stop agent + full reset)")
|
||||
try:
|
||||
await asyncio.to_thread(cancel_task, task_id, reason="Plane STOP status", source="stop")
|
||||
except Exception as e: # never-raise: the webhook flow must not crash
|
||||
logger.error(f"STOP handling failed for task {task_id}: {e}")
|
||||
|
||||
|
||||
async def handle_status_start(data: dict, project_id: str = ""):
|
||||
"""An issue moved into In Progress.
|
||||
|
||||
@@ -279,6 +324,36 @@ async def handle_status_start(data: dict, project_id: str = ""):
|
||||
)
|
||||
return
|
||||
|
||||
# ORCH-090 (ADR-001 D6 / AC-5): close the relaunch hole. The legitimate "answer
|
||||
# to Needs Input" resume is owned ONLY by the analyst (ORCH-066 — the sole
|
||||
# Needs-Input setter). A manual move of an EXISTING task at any OTHER stage to
|
||||
# "To Analyse" must NOT silently relaunch the mid-pipeline agent on the old
|
||||
# branch (the incident pattern). Gate the relaunch to `analysis`; any other
|
||||
# stage -> no-op-with-log + a best-effort Plane hint to use STOP -> To Analyse
|
||||
# for a clean-slate restart. Under the kill-switch off this gate is inert
|
||||
# (behaviour 1:1 as before ORCH-090).
|
||||
from ..config import settings as _settings
|
||||
if getattr(_settings, "stop_status_enabled", False) and current_stage != "analysis":
|
||||
logger.info(
|
||||
f"Status->To Analyse for {plane_id}: existing task on stage "
|
||||
f"'{current_stage}' — NOT relaunching {stage_agent} (relaunch-hole closed, "
|
||||
f"ORCH-090). Use STOP then To Analyse to restart from scratch."
|
||||
)
|
||||
try:
|
||||
_add_comment(
|
||||
work_item_id,
|
||||
"ℹ️ Перезапуск "
|
||||
"агента сменой "
|
||||
"рабочего статуса "
|
||||
"отключён (ORCH-090). Для "
|
||||
"перезапуска с нуля: "
|
||||
"STOP → To Analyse.",
|
||||
author=stage_agent,
|
||||
)
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to post relaunch-hole comment for {work_item_id}: {e}")
|
||||
return
|
||||
|
||||
task_desc = (
|
||||
f"Work item: {work_item_id}\nRepo: {repo}\nBranch: {branch}\n"
|
||||
f"Stage: {current_stage}\nNote: Stakeholder returned the issue to In "
|
||||
|
||||
Reference in New Issue
Block a user