"""ORCH-090 — STOP-status task cancellation + relaunch-hole close (unit + integ). Covers 04-test-plan.yaml TC-01..TC-14 + the ADR-001 D7 deferred-cancel path: TC-01 STOP recognised + routed to handle_stop; unknown task -> no-op, never-raise. TC-02 active agent stopped via launcher.stop_process by jobs.pid; idle -> no-op. TC-03 queued+running jobs of the task -> terminal 'cancelled'; claim skips them. TC-04 reaper does NOT requeue a job of a terminal (cancelled) task. TC-05 full reset: remove_worktree + delete_remote_branch called; main untouched. TC-06 docs artefacts (and the task row) survive the reset. TC-07 idempotency: STOP on cancelled / done / missing -> no-op, no exception. TC-08 kill-switch off -> STOP inert; relaunch-hole gate inert. TC-09 GET /queue carries a read-only `stop` block; never-raise. TC-10 relaunch-hole closed: manual To Analyse on a mid-pipeline task -> no job. TC-11 To Analyse on analysis (idle) relaunches analyst; new task -> start_pipeline. TC-12 terminal-skip / restart-safe: reconciler skips a cancelled task; cancelled jobs are not revived by requeue_running_jobs. TC-13 e2e STOP: agent stopped, jobs cancelled, branch/worktree removed, durable 'cancelled', keys tombstoned, notifications fired. TC-14 additive DB migration is idempotent (re-init_db) + columns present. D7 STOP in a critical merge/deploy window is DEFERRED, then applied by the deploy finalizer. """ import os import tempfile import pytest os.environ["ORCH_DB_PATH"] = os.path.join(tempfile.gettempdir(), "test_stop_status.db") os.environ.setdefault("ORCH_GITEA_TOKEN", "test-token") os.environ.setdefault("ORCH_PLANE_API_TOKEN", "test-token") import src.db as db # noqa: E402 from src.db import ( # noqa: E402 init_db, get_db, claim_next_job, get_task, cancel_jobs_for_task, mark_task_cancelled, get_task_by_plane_id, requeue_running_jobs, get_job, ) from src import config as cfg # noqa: E402 from src import cancel as cancel_mod # noqa: E402 from src import stage_engine # noqa: E402 @pytest.fixture(autouse=True) def fresh_db(tmp_path, monkeypatch): dbfile = tmp_path / "stop.db" monkeypatch.setattr(db.settings, "db_path", str(dbfile)) # STOP feature ON, all repos. Isolate repos_dir so the critical-window probe # (deploy markers / merge-lease) sees a clean tree by default. monkeypatch.setattr(cfg.settings, "stop_status_enabled", True, raising=False) monkeypatch.setattr(cfg.settings, "stop_status_repos", "", raising=False) monkeypatch.setattr(cfg.settings, "repos_dir", str(tmp_path / "repos"), raising=False) monkeypatch.setattr(cfg.settings, "host_repos_dir", str(tmp_path / "repos"), raising=False) monkeypatch.setattr(cfg.settings, "serial_gate_enabled", False, raising=False) monkeypatch.setattr(cfg.settings, "task_deps_enabled", False, raising=False) # Silence network side effects of cancel notifications. monkeypatch.setattr("src.stage_engine.plane_add_comment", lambda *a, **k: None, raising=False) monkeypatch.setattr("src.notifications.update_task_tracker", lambda *a, **k: None, raising=False) init_db() yield # --------------------------------------------------------------------------- helpers def _make_task(plane_id, work_item_id, stage="development", repo="orchestrator", branch=None): branch = branch or f"feature/{work_item_id}-slug" conn = get_db() cur = conn.execute( "INSERT INTO tasks (plane_id, work_item_id, repo, branch, stage, plane_issue_id, title) " "VALUES (?, ?, ?, ?, ?, ?, ?)", (plane_id, work_item_id, repo, branch, stage, plane_id, work_item_id), ) tid = cur.lastrowid conn.commit() conn.close() return tid def _make_job(task_id, repo="orchestrator", agent="developer", status="running", pid=None, run_id=None, attempts=1, max_attempts=2): conn = get_db() cur = conn.execute( "INSERT INTO jobs (agent, repo, task_id, status, pid, run_id, attempts, max_attempts) " "VALUES (?, ?, ?, ?, ?, ?, ?, ?)", (agent, repo, task_id, status, pid, run_id, attempts, max_attempts), ) jid = cur.lastrowid conn.commit() conn.close() return jid def _job_status(job_id): j = get_job(job_id) return j["status"] if j else None def _stub_full_reset(monkeypatch): """Stub the side-effecting cleanup steps (signals / git / gitea) of a full reset.""" calls = {"stop": [], "worktree": [], "branch": []} from src.agents.launcher import launcher def _stop(pid, run_id, *, reason="stop"): calls["stop"].append((pid, run_id, reason)) return True monkeypatch.setattr(launcher, "stop_process", _stop, raising=True) monkeypatch.setattr("src.git_worktree.remove_worktree", lambda repo, branch: calls["worktree"].append((repo, branch)), raising=True) monkeypatch.setattr("src.gitea.delete_remote_branch", lambda repo, branch: calls["branch"].append((repo, branch)) or True, raising=True) return calls # =========================================================================== TC-01 @pytest.mark.asyncio async def test_tc01_stop_routed_and_unknown_is_noop(monkeypatch): from src.webhooks import plane as plane_wh proj_states = { "stop": "STOP-UUID", "to_analyse": "TA-UUID", "approved": "AP-UUID", "rejected": "RJ-UUID", "confirm_deploy": None, } monkeypatch.setattr("src.plane_sync.get_project_states", lambda pid: proj_states) seen = [] async def _stub_stop(data, project_id=""): seen.append(data.get("id")) monkeypatch.setattr(plane_wh, "handle_stop", _stub_stop) # STOP state -> routed to handle_stop. await plane_wh.handle_issue_updated({"id": "PL-1", "state": {"id": "STOP-UUID"}}, "proj") assert seen == ["PL-1"] # A non-STOP state does not route to handle_stop. await plane_wh.handle_issue_updated({"id": "PL-2", "state": {"id": "AP-UUID"}}, "proj") assert seen == ["PL-1"] # Unknown task on the real handler -> no-op, never raises. await plane_wh.handle_stop({"id": "does-not-exist"}, "proj") # =========================================================================== TC-02 def test_tc02_stop_active_agent_by_pid(monkeypatch): calls = _stub_full_reset(monkeypatch) tid = _make_task("PL-10", "ORCH-310", stage="development") _make_job(tid, status="running", pid=4242, run_id=77) res = stage_engine.cancel_task(tid) assert res["ok"] and not res["deferred"] assert calls["stop"] == [(4242, 77, f"STOP cancel task {tid}")] assert res["stopped"] == 1 def test_tc02_idle_agent_no_stop(monkeypatch): calls = _stub_full_reset(monkeypatch) tid = _make_task("PL-11", "ORCH-311", stage="development") _make_job(tid, status="queued", pid=None) # no running process res = stage_engine.cancel_task(tid) assert res["ok"] and res["stopped"] == 0 assert calls["stop"] == [] # =========================================================================== TC-03 def test_tc03_jobs_cancelled_and_claim_skips(monkeypatch): _stub_full_reset(monkeypatch) tid = _make_task("PL-20", "ORCH-320", stage="development") jq = _make_job(tid, status="queued") jr = _make_job(tid, status="running", pid=None) stage_engine.cancel_task(tid) assert _job_status(jq) == "cancelled" assert _job_status(jr) == "cancelled" # claim_next_job selects only status='queued' -> a cancelled job is never claimed. assert claim_next_job() is None def test_tc03_cancel_jobs_helper_only_queued(monkeypatch): tid = _make_task("PL-21", "ORCH-321") jq = _make_job(tid, status="queued") jr = _make_job(tid, status="running", pid=None) n = cancel_jobs_for_task(tid, only_queued=True) assert n == 1 assert _job_status(jq) == "cancelled" assert _job_status(jr) == "running" # the running deploy/merge actor is left alone # =========================================================================== TC-04 def test_tc04_reaper_does_not_requeue_terminal_task(monkeypatch): from src.job_reaper import JobReaper tid = _make_task("PL-30", "ORCH-330", stage="development") jid = _make_job(tid, status="running", pid=999999, attempts=1, max_attempts=2) # Task is flipped to cancelled (as STOP would) while the job is still running. mark_task_cancelled(tid) reaper = JobReaper() job = get_job(jid) reaper._reap_unknown_outcome(job, reason="dead pid") # NOT requeued (attempts terminal 'cancelled'. assert _job_status(jid) == "cancelled" # =========================================================================== TC-05 def test_tc05_full_reset_removes_branch_and_worktree(monkeypatch): calls = _stub_full_reset(monkeypatch) tid = _make_task("PL-40", "ORCH-340", stage="review", branch="feature/ORCH-340-x") stage_engine.cancel_task(tid) assert calls["worktree"] == [("orchestrator", "feature/ORCH-340-x")] assert calls["branch"] == [("orchestrator", "feature/ORCH-340-x")] def test_tc05_delete_remote_branch_refuses_main(): from src import gitea # main is never deletable by the cancel path (self-hosting safety, NFR-3). assert gitea.delete_remote_branch("orchestrator", "main") is False assert gitea.delete_remote_branch("orchestrator", "master") is False # =========================================================================== TC-06 def test_tc06_docs_and_task_row_survive(monkeypatch, tmp_path): _stub_full_reset(monkeypatch) tid = _make_task("PL-50", "ORCH-350", stage="development") # A stand-in docs artefact: cancel must not delete it. docs = tmp_path / "docs" / "work-items" / "ORCH-350" docs.mkdir(parents=True) (docs / "02-trz.md").write_text("trz") stage_engine.cancel_task(tid) assert (docs / "02-trz.md").exists(), "docs artefacts must be preserved" # The task ROW is kept (durable audit), flipped to cancelled. assert get_task(tid)["stage"] == "cancelled" # =========================================================================== TC-07 def test_tc07_idempotent_on_cancelled_done_missing(monkeypatch): calls = _stub_full_reset(monkeypatch) # already cancelled tid = _make_task("PL-60", "ORCH-360", stage="cancelled") res = stage_engine.cancel_task(tid) assert res["ok"] and res["note"].startswith("already-terminal") assert calls["stop"] == [] and calls["branch"] == [] # done tid2 = _make_task("PL-61", "ORCH-361", stage="done") res2 = stage_engine.cancel_task(tid2) assert res2["note"].startswith("already-terminal") # missing res3 = stage_engine.cancel_task(999999) assert res3["note"] == "no-task" # =========================================================================== TC-08 def test_tc08_kill_switch_off_inert(monkeypatch): monkeypatch.setattr(cfg.settings, "stop_status_enabled", False, raising=False) assert cancel_mod.applies("orchestrator") is False @pytest.mark.asyncio async def test_tc08_kill_switch_off_handle_stop_noop(monkeypatch): monkeypatch.setattr(cfg.settings, "stop_status_enabled", False, raising=False) calls = _stub_full_reset(monkeypatch) from src.webhooks import plane as plane_wh tid = _make_task("PL-70", "ORCH-370", stage="development") _make_job(tid, status="running", pid=4242) await plane_wh.handle_stop({"id": "PL-70"}, "proj") # Nothing was cancelled (kill-switch off -> applies() False -> no-op). assert calls["stop"] == [] assert get_task(tid)["stage"] == "development" def test_tc08_scope_csv(monkeypatch): monkeypatch.setattr(cfg.settings, "stop_status_repos", "enduro-trails", raising=False) assert cancel_mod.applies("enduro-trails") is True assert cancel_mod.applies("orchestrator") is False # =========================================================================== TC-09 def test_tc09_queue_has_stop_block_and_keeps_keys(monkeypatch): import asyncio from src import main payload = asyncio.run(main.queue()) for key in ("counts", "serial_gate", "task_deps", "auto_labels", "recent"): assert key in payload, f"existing /queue key '{key}' preserved" assert "stop" in payload blk = payload["stop"] assert blk["enabled"] is True assert "repos" in blk and "cancelled_count" in blk and "recent" in blk def test_tc09_snapshot_never_raises(monkeypatch): # Force a DB error inside the snapshot -> minimal dict, no raise. monkeypatch.setattr("src.db.cancelled_tasks_snapshot", lambda *a, **k: (_ for _ in ()).throw(RuntimeError("boom"))) snap = cancel_mod.snapshot() assert snap["enabled"] is True and snap["cancelled_count"] == 0 # =========================================================================== TC-10 @pytest.mark.asyncio async def test_tc10_relaunch_hole_closed_midpipeline(monkeypatch): from src.webhooks import plane as plane_wh monkeypatch.setattr("src.plane_sync.add_comment", lambda *a, **k: None, raising=False) monkeypatch.setattr("src.plane_sync.set_issue_analysis", lambda *a, **k: None, raising=False) tid = _make_task("PL-80", "ORCH-380", stage="development") await plane_wh.handle_status_start({"id": "PL-80"}, "proj") # No stage agent was relaunched (no job created) for a mid-pipeline task. conn = get_db() n = conn.execute("SELECT COUNT(*) FROM jobs WHERE task_id=?", (tid,)).fetchone()[0] conn.close() assert n == 0 # =========================================================================== TC-11 @pytest.mark.asyncio async def test_tc11_analysis_idle_relaunches_analyst(monkeypatch): from src.webhooks import plane as plane_wh monkeypatch.setattr("src.plane_sync.add_comment", lambda *a, **k: None, raising=False) monkeypatch.setattr("src.plane_sync.set_issue_analysis", lambda *a, **k: None, raising=False) tid = _make_task("PL-90", "ORCH-390", stage="analysis") await plane_wh.handle_status_start({"id": "PL-90"}, "proj") conn = get_db() rows = conn.execute("SELECT agent FROM jobs WHERE task_id=?", (tid,)).fetchall() conn.close() assert [r[0] for r in rows] == ["analyst"], "analyst resume is still legitimate" @pytest.mark.asyncio async def test_tc11_new_task_starts_pipeline(monkeypatch): from src.webhooks import plane as plane_wh started = [] async def _stub_start(data, project_id=""): started.append(data.get("id")) monkeypatch.setattr(plane_wh, "start_pipeline", _stub_start) await plane_wh.handle_status_start({"id": "PL-NEW"}, "proj") assert started == ["PL-NEW"] # the ONLY pipeline-start entry point # =========================================================================== TC-12 def test_tc12_reconciler_skips_cancelled(monkeypatch): from src.reconciler import Reconciler # Avoid any Plane network in the gate pass. monkeypatch.setattr("src.reconciler.fetch_issue_state", lambda *a, **k: (_ for _ in ()).throw(AssertionError("no net")), raising=False) tid = _make_task("PL-100", "ORCH-400", stage="development") mark_task_cancelled(tid) rec = Reconciler() rec.reconcile_gate_once() assert rec.skipped_terminal_total == 1 def test_tc12_requeue_running_does_not_revive_cancelled(monkeypatch): tid = _make_task("PL-101", "ORCH-401", stage="development") jc = _make_job(tid, status="running", pid=None) cancel_jobs_for_task(tid) # -> cancelled assert _job_status(jc) == "cancelled" # Startup recovery flips only 'running' jobs; a cancelled job is untouched. requeue_running_jobs() assert _job_status(jc) == "cancelled" # =========================================================================== TC-13 def test_tc13_end_to_end_stop(monkeypatch): calls = _stub_full_reset(monkeypatch) tid = _make_task("PL-110", "ORCH-410", stage="review", branch="feature/ORCH-410-e2e") jr = _make_job(tid, status="running", pid=5555, run_id=11) jq = _make_job(tid, status="queued") res = stage_engine.cancel_task(tid, reason="Plane STOP status") assert res["ok"] and not res["deferred"] # agent stopped assert calls["stop"] and calls["stop"][0][0] == 5555 # jobs cancelled assert _job_status(jr) == "cancelled" and _job_status(jq) == "cancelled" # worktree + branch removed assert calls["worktree"] and calls["branch"] # durable terminal + key tombstone (re-create via To Analyse no longer collides) t = get_task(tid) assert t["stage"] == "cancelled" and t["cancelled_at"] assert t["plane_id"].endswith(f"#cancelled-{tid}") assert t["work_item_id"].endswith(f"#cancelled-{tid}") # plane_issue_id is tombstoned too (the lookup ORs on it) but the original UUID # remains recoverable from the parseable suffix (audit link preserved). assert t["plane_issue_id"] == f"PL-110#cancelled-{tid}" assert t["plane_issue_id"].split("#cancelled-")[0] == "PL-110" assert get_task_by_plane_id("PL-110") is None # freed for a fresh start # =========================================================================== TC-14 def test_tc14_migration_idempotent_and_columns_present(): # Re-running init_db must not fail (idempotent _ensure_column). init_db() init_db() conn = get_db() cols = {r[1] for r in conn.execute("PRAGMA table_info(tasks)").fetchall()} conn.close() assert "cancelled_at" in cols and "cancel_requested_at" in cols def test_tc14_existing_contracts_intact(): # The additive job status set still has the original statuses working. tid = _make_task("PL-120", "ORCH-420") jid = _make_job(tid, status="queued") # A queued job is still claimable when no gate blocks it. claimed = claim_next_job() assert claimed is not None and claimed["id"] == jid # =========================================================================== D7 def test_d7_stop_in_critical_window_defers(monkeypatch): calls = _stub_full_reset(monkeypatch) from src import self_deploy tid = _make_task("PL-130", "ORCH-430", stage="deploy", branch="feature/ORCH-430-d") # self-deploy Phase B initiated -> critical window. self_deploy.write_marker("orchestrator", "ORCH-430", self_deploy.INITIATED, content="1") jq = _make_job(tid, status="queued") jr = _make_job(tid, status="running", pid=7777) # the deploy actor res = stage_engine.cancel_task(tid) assert res["deferred"] is True and res["ok"] # Only queued jobs cancelled; the running deploy actor is NOT killed. assert _job_status(jq) == "cancelled" assert _job_status(jr) == "running" assert calls["stop"] == [] and calls["branch"] == [] # The deferred flag is durable; the task is NOT yet terminal. t = get_task(tid) assert t["cancel_requested_at"] and t["stage"] == "deploy" def test_d7_in_critical_window_detection(monkeypatch): from src import self_deploy task = {"repo": "orchestrator", "work_item_id": "ORCH-431", "branch": "feature/x"} assert cancel_mod.in_critical_window(task) is False self_deploy.write_marker("orchestrator", "ORCH-431", self_deploy.INITIATED, content="1") assert cancel_mod.in_critical_window(task) is True def test_d7_lease_held_idle_parking_is_not_critical(monkeypatch): """ORCH-090 review P1: a task PARKED on `deploy` awaiting Confirm Deploy holds the merge-lease but is fully reversible -> NOT a critical window (else the deferred cancel is never applied and the task wedges).""" from src import merge_gate os.makedirs(cfg.settings.repos_dir, exist_ok=True) branch = "feature/ORCH-432-park" tid = _make_task("PL-432", "ORCH-432", stage="deploy", branch=branch) # Lease HELD by this task's branch, NO INITIATED marker, NO running job. acquired, _ = merge_gate.acquire_merge_lease("orchestrator", branch, "ORCH-432") assert acquired assert merge_gate.current_lease_holder("orchestrator") == branch task = get_task(tid) assert cancel_mod.in_critical_window(task) is False def test_d7_lease_held_with_running_actor_still_critical(monkeypatch): """Lease held AND a deploy/merge actor actually running -> still critical (defer).""" from src import merge_gate os.makedirs(cfg.settings.repos_dir, exist_ok=True) branch = "feature/ORCH-433-merge" tid = _make_task("PL-433", "ORCH-433", stage="deploy", branch=branch) merge_gate.acquire_merge_lease("orchestrator", branch, "ORCH-433") _make_job(tid, status="running", pid=9191) # the merge/deploy actor task = get_task(tid) assert cancel_mod.in_critical_window(task) is True def test_d7_stop_on_deploy_awaiting_confirm_full_resets(monkeypatch): """End-to-end of the P1 fix: STOP while parked on `deploy` awaiting Confirm Deploy -> immediate FULL reset (terminal cancelled, branch deleted, lease released).""" calls = _stub_full_reset(monkeypatch) from src import merge_gate os.makedirs(cfg.settings.repos_dir, exist_ok=True) branch = "feature/ORCH-434-park" tid = _make_task("PL-434", "ORCH-434", stage="deploy", branch=branch) merge_gate.acquire_merge_lease("orchestrator", branch, "ORCH-434") res = stage_engine.cancel_task(tid) assert res["ok"] and not res["deferred"] assert res["note"] == "cancelled" # Durable terminal + branch deleted -> repo no longer wedged. assert get_task(tid)["stage"] == "cancelled" assert calls["branch"], "full reset deletes the remote feature branch" # The held lease was released (step 3c) -> the repo's serial-gate is unblocked. assert merge_gate.current_lease_holder("orchestrator") is None def test_d7_repeated_stop_in_critical_window_no_duplicate_notify(monkeypatch): """AC-6 / P2: a repeated STOP while still deferred does not re-notify.""" _stub_full_reset(monkeypatch) from src import self_deploy notifies = [] monkeypatch.setattr(stage_engine, "_notify_cancel", lambda *a, **k: notifies.append(a), raising=True) tid = _make_task("PL-435", "ORCH-435", stage="deploy", branch="feature/ORCH-435-d") self_deploy.write_marker("orchestrator", "ORCH-435", self_deploy.INITIATED, content="1") r1 = stage_engine.cancel_task(tid) r2 = stage_engine.cancel_task(tid) assert r1["deferred"] and r1["note"] == "deferred-critical-window" assert r2["deferred"] and r2["note"] == "deferred-already-pending" assert len(notifies) == 1, "only the first deferral transition notifies" def test_d7_deferred_applied_by_finalizer(monkeypatch): """After the irreversible step finishes, the finalizer applies the deferred cancel.""" calls = _stub_full_reset(monkeypatch) tid = _make_task("PL-140", "ORCH-440", stage="development", branch="feature/ORCH-440-d") # Mark a deferred cancellation pending (as the critical-window path would). db.set_task_cancel_requested(tid) # force=True is what run_deploy_finalizer uses once the step completed honestly. res = stage_engine.cancel_task(tid, force=True, source="deferred") assert res["ok"] and not res["deferred"] assert get_task(tid)["stage"] == "cancelled" assert calls["branch"], "deferred cancel applies the full reset"