Files
orchestrator/tests/test_stop_status.py
claude-bot aae65969d5 fix(cancel): narrow STOP critical-window so deploy-park cancel applies (ORCH-090)
Review P1: a STOP while a self-hosting task is PARKED on `deploy` awaiting the
manual `Confirm Deploy` was classified as a critical merge/deploy window solely
because the task still held the per-repo merge-lease (held from merge-gate through
deploy->done). That window is fully reversible — nothing is merged or deployed yet
(the irreversible merge_pr runs later in _handle_merge_verify, always under an
INITIATED marker). So the cancel was DEFERRED to run_deploy_finalizer, which only
runs after Phase B (Confirm Deploy) — the very step the operator pressed STOP to
avoid. Result: the deferred cancel was never applied, the task wedged non-terminal
holding the lease, blocking the repo's serial-gate (ORCH-088) and merges.

Fix: gate the merge-lease branch of cancel.in_critical_window on an actively
RUNNING actor (_task_has_running_actor). Lease held + running deploy/merge job ->
still deferred (genuine in-flight step). Lease held + no running actor (idle
deploy parking) -> NOT critical -> immediate full reset, which itself releases the
lease (step 3c) and drives the task terminal. INITIATED-marker deferral unchanged.

Also fixes review P2 (AC-6): set_task_cancel_requested now returns the first-stamp
fact (rowcount), and the deferred branch only notifies on the first transition —
a repeated STOP while still deferred no longer spams duplicate notifications.

Tests: test_d7_lease_held_idle_parking_is_not_critical,
test_d7_lease_held_with_running_actor_still_critical,
test_d7_stop_on_deploy_awaiting_confirm_full_resets,
test_d7_repeated_stop_in_critical_window_no_duplicate_notify. Full suite green (1349).

Refs: ORCH-090

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-09 21:31:56 +03:00

521 lines
23 KiB
Python

"""ORCH-090 — STOP-status task cancellation + relaunch-hole close (unit + integ).
Covers 04-test-plan.yaml TC-01..TC-14 + the ADR-001 D7 deferred-cancel path:
TC-01 STOP recognised + routed to handle_stop; unknown task -> no-op, never-raise.
TC-02 active agent stopped via launcher.stop_process by jobs.pid; idle -> no-op.
TC-03 queued+running jobs of the task -> terminal 'cancelled'; claim skips them.
TC-04 reaper does NOT requeue a job of a terminal (cancelled) task.
TC-05 full reset: remove_worktree + delete_remote_branch called; main untouched.
TC-06 docs artefacts (and the task row) survive the reset.
TC-07 idempotency: STOP on cancelled / done / missing -> no-op, no exception.
TC-08 kill-switch off -> STOP inert; relaunch-hole gate inert.
TC-09 GET /queue carries a read-only `stop` block; never-raise.
TC-10 relaunch-hole closed: manual To Analyse on a mid-pipeline task -> no job.
TC-11 To Analyse on analysis (idle) relaunches analyst; new task -> start_pipeline.
TC-12 terminal-skip / restart-safe: reconciler skips a cancelled task; cancelled
jobs are not revived by requeue_running_jobs.
TC-13 e2e STOP: agent stopped, jobs cancelled, branch/worktree removed, durable
'cancelled', keys tombstoned, notifications fired.
TC-14 additive DB migration is idempotent (re-init_db) + columns present.
D7 STOP in a critical merge/deploy window is DEFERRED, then applied by the
deploy finalizer.
"""
import os
import tempfile
import pytest
os.environ["ORCH_DB_PATH"] = os.path.join(tempfile.gettempdir(), "test_stop_status.db")
os.environ.setdefault("ORCH_GITEA_TOKEN", "test-token")
os.environ.setdefault("ORCH_PLANE_API_TOKEN", "test-token")
import src.db as db # noqa: E402
from src.db import ( # noqa: E402
init_db, get_db, claim_next_job, get_task,
cancel_jobs_for_task, mark_task_cancelled, get_task_by_plane_id,
requeue_running_jobs, get_job,
)
from src import config as cfg # noqa: E402
from src import cancel as cancel_mod # noqa: E402
from src import stage_engine # noqa: E402
@pytest.fixture(autouse=True)
def fresh_db(tmp_path, monkeypatch):
dbfile = tmp_path / "stop.db"
monkeypatch.setattr(db.settings, "db_path", str(dbfile))
# STOP feature ON, all repos. Isolate repos_dir so the critical-window probe
# (deploy markers / merge-lease) sees a clean tree by default.
monkeypatch.setattr(cfg.settings, "stop_status_enabled", True, raising=False)
monkeypatch.setattr(cfg.settings, "stop_status_repos", "", raising=False)
monkeypatch.setattr(cfg.settings, "repos_dir", str(tmp_path / "repos"), raising=False)
monkeypatch.setattr(cfg.settings, "host_repos_dir", str(tmp_path / "repos"), raising=False)
monkeypatch.setattr(cfg.settings, "serial_gate_enabled", False, raising=False)
monkeypatch.setattr(cfg.settings, "task_deps_enabled", False, raising=False)
# Silence network side effects of cancel notifications.
monkeypatch.setattr("src.stage_engine.plane_add_comment", lambda *a, **k: None, raising=False)
monkeypatch.setattr("src.notifications.update_task_tracker", lambda *a, **k: None, raising=False)
init_db()
yield
# --------------------------------------------------------------------------- helpers
def _make_task(plane_id, work_item_id, stage="development", repo="orchestrator",
branch=None):
branch = branch or f"feature/{work_item_id}-slug"
conn = get_db()
cur = conn.execute(
"INSERT INTO tasks (plane_id, work_item_id, repo, branch, stage, plane_issue_id, title) "
"VALUES (?, ?, ?, ?, ?, ?, ?)",
(plane_id, work_item_id, repo, branch, stage, plane_id, work_item_id),
)
tid = cur.lastrowid
conn.commit()
conn.close()
return tid
def _make_job(task_id, repo="orchestrator", agent="developer", status="running",
pid=None, run_id=None, attempts=1, max_attempts=2):
conn = get_db()
cur = conn.execute(
"INSERT INTO jobs (agent, repo, task_id, status, pid, run_id, attempts, max_attempts) "
"VALUES (?, ?, ?, ?, ?, ?, ?, ?)",
(agent, repo, task_id, status, pid, run_id, attempts, max_attempts),
)
jid = cur.lastrowid
conn.commit()
conn.close()
return jid
def _job_status(job_id):
j = get_job(job_id)
return j["status"] if j else None
def _stub_full_reset(monkeypatch):
"""Stub the side-effecting cleanup steps (signals / git / gitea) of a full reset."""
calls = {"stop": [], "worktree": [], "branch": []}
from src.agents.launcher import launcher
def _stop(pid, run_id, *, reason="stop"):
calls["stop"].append((pid, run_id, reason))
return True
monkeypatch.setattr(launcher, "stop_process", _stop, raising=True)
monkeypatch.setattr("src.git_worktree.remove_worktree",
lambda repo, branch: calls["worktree"].append((repo, branch)),
raising=True)
monkeypatch.setattr("src.gitea.delete_remote_branch",
lambda repo, branch: calls["branch"].append((repo, branch)) or True,
raising=True)
return calls
# =========================================================================== TC-01
@pytest.mark.asyncio
async def test_tc01_stop_routed_and_unknown_is_noop(monkeypatch):
from src.webhooks import plane as plane_wh
proj_states = {
"stop": "STOP-UUID", "to_analyse": "TA-UUID", "approved": "AP-UUID",
"rejected": "RJ-UUID", "confirm_deploy": None,
}
monkeypatch.setattr("src.plane_sync.get_project_states", lambda pid: proj_states)
seen = []
async def _stub_stop(data, project_id=""):
seen.append(data.get("id"))
monkeypatch.setattr(plane_wh, "handle_stop", _stub_stop)
# STOP state -> routed to handle_stop.
await plane_wh.handle_issue_updated({"id": "PL-1", "state": {"id": "STOP-UUID"}}, "proj")
assert seen == ["PL-1"]
# A non-STOP state does not route to handle_stop.
await plane_wh.handle_issue_updated({"id": "PL-2", "state": {"id": "AP-UUID"}}, "proj")
assert seen == ["PL-1"]
# Unknown task on the real handler -> no-op, never raises.
await plane_wh.handle_stop({"id": "does-not-exist"}, "proj")
# =========================================================================== TC-02
def test_tc02_stop_active_agent_by_pid(monkeypatch):
calls = _stub_full_reset(monkeypatch)
tid = _make_task("PL-10", "ORCH-310", stage="development")
_make_job(tid, status="running", pid=4242, run_id=77)
res = stage_engine.cancel_task(tid)
assert res["ok"] and not res["deferred"]
assert calls["stop"] == [(4242, 77, f"STOP cancel task {tid}")]
assert res["stopped"] == 1
def test_tc02_idle_agent_no_stop(monkeypatch):
calls = _stub_full_reset(monkeypatch)
tid = _make_task("PL-11", "ORCH-311", stage="development")
_make_job(tid, status="queued", pid=None) # no running process
res = stage_engine.cancel_task(tid)
assert res["ok"] and res["stopped"] == 0
assert calls["stop"] == []
# =========================================================================== TC-03
def test_tc03_jobs_cancelled_and_claim_skips(monkeypatch):
_stub_full_reset(monkeypatch)
tid = _make_task("PL-20", "ORCH-320", stage="development")
jq = _make_job(tid, status="queued")
jr = _make_job(tid, status="running", pid=None)
stage_engine.cancel_task(tid)
assert _job_status(jq) == "cancelled"
assert _job_status(jr) == "cancelled"
# claim_next_job selects only status='queued' -> a cancelled job is never claimed.
assert claim_next_job() is None
def test_tc03_cancel_jobs_helper_only_queued(monkeypatch):
tid = _make_task("PL-21", "ORCH-321")
jq = _make_job(tid, status="queued")
jr = _make_job(tid, status="running", pid=None)
n = cancel_jobs_for_task(tid, only_queued=True)
assert n == 1
assert _job_status(jq) == "cancelled"
assert _job_status(jr) == "running" # the running deploy/merge actor is left alone
# =========================================================================== TC-04
def test_tc04_reaper_does_not_requeue_terminal_task(monkeypatch):
from src.job_reaper import JobReaper
tid = _make_task("PL-30", "ORCH-330", stage="development")
jid = _make_job(tid, status="running", pid=999999, attempts=1, max_attempts=2)
# Task is flipped to cancelled (as STOP would) while the job is still running.
mark_task_cancelled(tid)
reaper = JobReaper()
job = get_job(jid)
reaper._reap_unknown_outcome(job, reason="dead pid")
# NOT requeued (attempts<max would normally requeue) -> terminal 'cancelled'.
assert _job_status(jid) == "cancelled"
# =========================================================================== TC-05
def test_tc05_full_reset_removes_branch_and_worktree(monkeypatch):
calls = _stub_full_reset(monkeypatch)
tid = _make_task("PL-40", "ORCH-340", stage="review", branch="feature/ORCH-340-x")
stage_engine.cancel_task(tid)
assert calls["worktree"] == [("orchestrator", "feature/ORCH-340-x")]
assert calls["branch"] == [("orchestrator", "feature/ORCH-340-x")]
def test_tc05_delete_remote_branch_refuses_main():
from src import gitea
# main is never deletable by the cancel path (self-hosting safety, NFR-3).
assert gitea.delete_remote_branch("orchestrator", "main") is False
assert gitea.delete_remote_branch("orchestrator", "master") is False
# =========================================================================== TC-06
def test_tc06_docs_and_task_row_survive(monkeypatch, tmp_path):
_stub_full_reset(monkeypatch)
tid = _make_task("PL-50", "ORCH-350", stage="development")
# A stand-in docs artefact: cancel must not delete it.
docs = tmp_path / "docs" / "work-items" / "ORCH-350"
docs.mkdir(parents=True)
(docs / "02-trz.md").write_text("trz")
stage_engine.cancel_task(tid)
assert (docs / "02-trz.md").exists(), "docs artefacts must be preserved"
# The task ROW is kept (durable audit), flipped to cancelled.
assert get_task(tid)["stage"] == "cancelled"
# =========================================================================== TC-07
def test_tc07_idempotent_on_cancelled_done_missing(monkeypatch):
calls = _stub_full_reset(monkeypatch)
# already cancelled
tid = _make_task("PL-60", "ORCH-360", stage="cancelled")
res = stage_engine.cancel_task(tid)
assert res["ok"] and res["note"].startswith("already-terminal")
assert calls["stop"] == [] and calls["branch"] == []
# done
tid2 = _make_task("PL-61", "ORCH-361", stage="done")
res2 = stage_engine.cancel_task(tid2)
assert res2["note"].startswith("already-terminal")
# missing
res3 = stage_engine.cancel_task(999999)
assert res3["note"] == "no-task"
# =========================================================================== TC-08
def test_tc08_kill_switch_off_inert(monkeypatch):
monkeypatch.setattr(cfg.settings, "stop_status_enabled", False, raising=False)
assert cancel_mod.applies("orchestrator") is False
@pytest.mark.asyncio
async def test_tc08_kill_switch_off_handle_stop_noop(monkeypatch):
monkeypatch.setattr(cfg.settings, "stop_status_enabled", False, raising=False)
calls = _stub_full_reset(monkeypatch)
from src.webhooks import plane as plane_wh
tid = _make_task("PL-70", "ORCH-370", stage="development")
_make_job(tid, status="running", pid=4242)
await plane_wh.handle_stop({"id": "PL-70"}, "proj")
# Nothing was cancelled (kill-switch off -> applies() False -> no-op).
assert calls["stop"] == []
assert get_task(tid)["stage"] == "development"
def test_tc08_scope_csv(monkeypatch):
monkeypatch.setattr(cfg.settings, "stop_status_repos", "enduro-trails", raising=False)
assert cancel_mod.applies("enduro-trails") is True
assert cancel_mod.applies("orchestrator") is False
# =========================================================================== TC-09
def test_tc09_queue_has_stop_block_and_keeps_keys(monkeypatch):
import asyncio
from src import main
payload = asyncio.run(main.queue())
for key in ("counts", "serial_gate", "task_deps", "auto_labels", "recent"):
assert key in payload, f"existing /queue key '{key}' preserved"
assert "stop" in payload
blk = payload["stop"]
assert blk["enabled"] is True
assert "repos" in blk and "cancelled_count" in blk and "recent" in blk
def test_tc09_snapshot_never_raises(monkeypatch):
# Force a DB error inside the snapshot -> minimal dict, no raise.
monkeypatch.setattr("src.db.cancelled_tasks_snapshot",
lambda *a, **k: (_ for _ in ()).throw(RuntimeError("boom")))
snap = cancel_mod.snapshot()
assert snap["enabled"] is True and snap["cancelled_count"] == 0
# =========================================================================== TC-10
@pytest.mark.asyncio
async def test_tc10_relaunch_hole_closed_midpipeline(monkeypatch):
from src.webhooks import plane as plane_wh
monkeypatch.setattr("src.plane_sync.add_comment", lambda *a, **k: None, raising=False)
monkeypatch.setattr("src.plane_sync.set_issue_analysis", lambda *a, **k: None, raising=False)
tid = _make_task("PL-80", "ORCH-380", stage="development")
await plane_wh.handle_status_start({"id": "PL-80"}, "proj")
# No stage agent was relaunched (no job created) for a mid-pipeline task.
conn = get_db()
n = conn.execute("SELECT COUNT(*) FROM jobs WHERE task_id=?", (tid,)).fetchone()[0]
conn.close()
assert n == 0
# =========================================================================== TC-11
@pytest.mark.asyncio
async def test_tc11_analysis_idle_relaunches_analyst(monkeypatch):
from src.webhooks import plane as plane_wh
monkeypatch.setattr("src.plane_sync.add_comment", lambda *a, **k: None, raising=False)
monkeypatch.setattr("src.plane_sync.set_issue_analysis", lambda *a, **k: None, raising=False)
tid = _make_task("PL-90", "ORCH-390", stage="analysis")
await plane_wh.handle_status_start({"id": "PL-90"}, "proj")
conn = get_db()
rows = conn.execute("SELECT agent FROM jobs WHERE task_id=?", (tid,)).fetchall()
conn.close()
assert [r[0] for r in rows] == ["analyst"], "analyst resume is still legitimate"
@pytest.mark.asyncio
async def test_tc11_new_task_starts_pipeline(monkeypatch):
from src.webhooks import plane as plane_wh
started = []
async def _stub_start(data, project_id=""):
started.append(data.get("id"))
monkeypatch.setattr(plane_wh, "start_pipeline", _stub_start)
await plane_wh.handle_status_start({"id": "PL-NEW"}, "proj")
assert started == ["PL-NEW"] # the ONLY pipeline-start entry point
# =========================================================================== TC-12
def test_tc12_reconciler_skips_cancelled(monkeypatch):
from src.reconciler import Reconciler
# Avoid any Plane network in the gate pass.
monkeypatch.setattr("src.reconciler.fetch_issue_state",
lambda *a, **k: (_ for _ in ()).throw(AssertionError("no net")),
raising=False)
tid = _make_task("PL-100", "ORCH-400", stage="development")
mark_task_cancelled(tid)
rec = Reconciler()
rec.reconcile_gate_once()
assert rec.skipped_terminal_total == 1
def test_tc12_requeue_running_does_not_revive_cancelled(monkeypatch):
tid = _make_task("PL-101", "ORCH-401", stage="development")
jc = _make_job(tid, status="running", pid=None)
cancel_jobs_for_task(tid) # -> cancelled
assert _job_status(jc) == "cancelled"
# Startup recovery flips only 'running' jobs; a cancelled job is untouched.
requeue_running_jobs()
assert _job_status(jc) == "cancelled"
# =========================================================================== TC-13
def test_tc13_end_to_end_stop(monkeypatch):
calls = _stub_full_reset(monkeypatch)
tid = _make_task("PL-110", "ORCH-410", stage="review", branch="feature/ORCH-410-e2e")
jr = _make_job(tid, status="running", pid=5555, run_id=11)
jq = _make_job(tid, status="queued")
res = stage_engine.cancel_task(tid, reason="Plane STOP status")
assert res["ok"] and not res["deferred"]
# agent stopped
assert calls["stop"] and calls["stop"][0][0] == 5555
# jobs cancelled
assert _job_status(jr) == "cancelled" and _job_status(jq) == "cancelled"
# worktree + branch removed
assert calls["worktree"] and calls["branch"]
# durable terminal + key tombstone (re-create via To Analyse no longer collides)
t = get_task(tid)
assert t["stage"] == "cancelled" and t["cancelled_at"]
assert t["plane_id"].endswith(f"#cancelled-{tid}")
assert t["work_item_id"].endswith(f"#cancelled-{tid}")
# plane_issue_id is tombstoned too (the lookup ORs on it) but the original UUID
# remains recoverable from the parseable suffix (audit link preserved).
assert t["plane_issue_id"] == f"PL-110#cancelled-{tid}"
assert t["plane_issue_id"].split("#cancelled-")[0] == "PL-110"
assert get_task_by_plane_id("PL-110") is None # freed for a fresh start
# =========================================================================== TC-14
def test_tc14_migration_idempotent_and_columns_present():
# Re-running init_db must not fail (idempotent _ensure_column).
init_db()
init_db()
conn = get_db()
cols = {r[1] for r in conn.execute("PRAGMA table_info(tasks)").fetchall()}
conn.close()
assert "cancelled_at" in cols and "cancel_requested_at" in cols
def test_tc14_existing_contracts_intact():
# The additive job status set still has the original statuses working.
tid = _make_task("PL-120", "ORCH-420")
jid = _make_job(tid, status="queued")
# A queued job is still claimable when no gate blocks it.
claimed = claim_next_job()
assert claimed is not None and claimed["id"] == jid
# =========================================================================== D7
def test_d7_stop_in_critical_window_defers(monkeypatch):
calls = _stub_full_reset(monkeypatch)
from src import self_deploy
tid = _make_task("PL-130", "ORCH-430", stage="deploy", branch="feature/ORCH-430-d")
# self-deploy Phase B initiated -> critical window.
self_deploy.write_marker("orchestrator", "ORCH-430", self_deploy.INITIATED, content="1")
jq = _make_job(tid, status="queued")
jr = _make_job(tid, status="running", pid=7777) # the deploy actor
res = stage_engine.cancel_task(tid)
assert res["deferred"] is True and res["ok"]
# Only queued jobs cancelled; the running deploy actor is NOT killed.
assert _job_status(jq) == "cancelled"
assert _job_status(jr) == "running"
assert calls["stop"] == [] and calls["branch"] == []
# The deferred flag is durable; the task is NOT yet terminal.
t = get_task(tid)
assert t["cancel_requested_at"] and t["stage"] == "deploy"
def test_d7_in_critical_window_detection(monkeypatch):
from src import self_deploy
task = {"repo": "orchestrator", "work_item_id": "ORCH-431", "branch": "feature/x"}
assert cancel_mod.in_critical_window(task) is False
self_deploy.write_marker("orchestrator", "ORCH-431", self_deploy.INITIATED, content="1")
assert cancel_mod.in_critical_window(task) is True
def test_d7_lease_held_idle_parking_is_not_critical(monkeypatch):
"""ORCH-090 review P1: a task PARKED on `deploy` awaiting Confirm Deploy holds the
merge-lease but is fully reversible -> NOT a critical window (else the deferred
cancel is never applied and the task wedges)."""
from src import merge_gate
os.makedirs(cfg.settings.repos_dir, exist_ok=True)
branch = "feature/ORCH-432-park"
tid = _make_task("PL-432", "ORCH-432", stage="deploy", branch=branch)
# Lease HELD by this task's branch, NO INITIATED marker, NO running job.
acquired, _ = merge_gate.acquire_merge_lease("orchestrator", branch, "ORCH-432")
assert acquired
assert merge_gate.current_lease_holder("orchestrator") == branch
task = get_task(tid)
assert cancel_mod.in_critical_window(task) is False
def test_d7_lease_held_with_running_actor_still_critical(monkeypatch):
"""Lease held AND a deploy/merge actor actually running -> still critical (defer)."""
from src import merge_gate
os.makedirs(cfg.settings.repos_dir, exist_ok=True)
branch = "feature/ORCH-433-merge"
tid = _make_task("PL-433", "ORCH-433", stage="deploy", branch=branch)
merge_gate.acquire_merge_lease("orchestrator", branch, "ORCH-433")
_make_job(tid, status="running", pid=9191) # the merge/deploy actor
task = get_task(tid)
assert cancel_mod.in_critical_window(task) is True
def test_d7_stop_on_deploy_awaiting_confirm_full_resets(monkeypatch):
"""End-to-end of the P1 fix: STOP while parked on `deploy` awaiting Confirm Deploy
-> immediate FULL reset (terminal cancelled, branch deleted, lease released)."""
calls = _stub_full_reset(monkeypatch)
from src import merge_gate
os.makedirs(cfg.settings.repos_dir, exist_ok=True)
branch = "feature/ORCH-434-park"
tid = _make_task("PL-434", "ORCH-434", stage="deploy", branch=branch)
merge_gate.acquire_merge_lease("orchestrator", branch, "ORCH-434")
res = stage_engine.cancel_task(tid)
assert res["ok"] and not res["deferred"]
assert res["note"] == "cancelled"
# Durable terminal + branch deleted -> repo no longer wedged.
assert get_task(tid)["stage"] == "cancelled"
assert calls["branch"], "full reset deletes the remote feature branch"
# The held lease was released (step 3c) -> the repo's serial-gate is unblocked.
assert merge_gate.current_lease_holder("orchestrator") is None
def test_d7_repeated_stop_in_critical_window_no_duplicate_notify(monkeypatch):
"""AC-6 / P2: a repeated STOP while still deferred does not re-notify."""
_stub_full_reset(monkeypatch)
from src import self_deploy
notifies = []
monkeypatch.setattr(stage_engine, "_notify_cancel",
lambda *a, **k: notifies.append(a), raising=True)
tid = _make_task("PL-435", "ORCH-435", stage="deploy", branch="feature/ORCH-435-d")
self_deploy.write_marker("orchestrator", "ORCH-435", self_deploy.INITIATED, content="1")
r1 = stage_engine.cancel_task(tid)
r2 = stage_engine.cancel_task(tid)
assert r1["deferred"] and r1["note"] == "deferred-critical-window"
assert r2["deferred"] and r2["note"] == "deferred-already-pending"
assert len(notifies) == 1, "only the first deferral transition notifies"
def test_d7_deferred_applied_by_finalizer(monkeypatch):
"""After the irreversible step finishes, the finalizer applies the deferred cancel."""
calls = _stub_full_reset(monkeypatch)
tid = _make_task("PL-140", "ORCH-440", stage="development", branch="feature/ORCH-440-d")
# Mark a deferred cancellation pending (as the critical-window path would).
db.set_task_cancel_requested(tid)
# force=True is what run_deploy_finalizer uses once the step completed honestly.
res = stage_engine.cancel_task(tid, force=True, source="deferred")
assert res["ok"] and not res["deferred"]
assert get_task(tid)["stage"] == "cancelled"
assert calls["branch"], "deferred cancel applies the full reset"