"""ORCH-094 — deterministic post-deploy-monitor termination (FR-3 / AC-3). Covers (04-test-plan.yaml): TC-06 after the window finishes (HEALTHY, ticks==budget -> set_issue_done + `done` marker) there are NO further status PATCHes for the task (a second tick is a no-op: 0 set_issue_* calls). TC-07 a tick at DB stage=done with a closed window OR a task cancelled mid-window -> immediate no-op: no status PATCH and no next-tick enqueue (zombie-tick excluded). TC-08 arm_monitor does not re-arm a task already in done (armed/done marker -> no-op), and a deploy->done re-drive after the window closed converges to Done instead of resurrecting Monitoring. """ import os import tempfile import pytest _test_db = os.path.join(tempfile.gettempdir(), "test_post_deploy_termination.db") os.environ["ORCH_DB_PATH"] = _test_db os.environ["ORCH_REPOS_DIR"] = tempfile.gettempdir() os.environ.setdefault("ORCH_GITEA_TOKEN", "test-token") os.environ.setdefault("ORCH_PLANE_API_TOKEN", "test-token") from unittest.mock import MagicMock # noqa: E402 import src.db as _db # noqa: E402 from src.db import init_db, get_db # noqa: E402 from src import stage_engine # noqa: E402 from src import post_deploy # noqa: E402 from src import config as cfg # noqa: E402 @pytest.fixture(autouse=True) def fresh_db(monkeypatch, tmp_path): monkeypatch.setattr(_db.settings, "db_path", _test_db) if os.path.exists(_test_db): os.unlink(_test_db) init_db() monkeypatch.setattr(post_deploy.settings, "repos_dir", str(tmp_path)) monkeypatch.setattr(post_deploy.settings, "host_repos_dir", str(tmp_path)) # Small window so the budget is 1 tick (window // interval). monkeypatch.setattr(stage_engine.settings, "post_deploy_window_s", 10) monkeypatch.setattr(stage_engine.settings, "post_deploy_interval_s", 10) monkeypatch.setattr(post_deploy.settings, "post_deploy_window_s", 10) monkeypatch.setattr(post_deploy.settings, "post_deploy_interval_s", 10) # write_post_deploy_log touches a worktree/git; stub it. monkeypatch.setattr(post_deploy, "write_post_deploy_log", MagicMock(return_value=True)) yield @pytest.fixture def spy_status(monkeypatch): setters = {} for name in ("set_issue_done", "set_issue_monitoring", "set_issue_awaiting_deploy", "set_issue_deploying", "set_issue_blocked"): m = MagicMock() monkeypatch.setattr(stage_engine, name, m) setters[name] = m monkeypatch.setattr(stage_engine, "_notify_post_deploy", MagicMock()) return setters def _make_task(stage="done", repo="orchestrator", wi="ORCH-061", branch="feature/ORCH-061-x"): conn = get_db() cur = conn.execute( "INSERT INTO tasks (plane_id, work_item_id, repo, branch, stage) " "VALUES (?, ?, ?, ?, ?)", (f"plane-{wi}", wi, repo, branch, stage), ) tid = cur.lastrowid conn.commit() conn.close() return tid def _jobs(): conn = get_db() rows = conn.execute("SELECT agent FROM jobs ORDER BY id").fetchall() conn.close() return [r[0] for r in rows] def _healthy(*a, **k): return post_deploy.ProbeResult(health_ok=True, total=2, fivexx=0, detail="ok") # --- TC-06 ------------------------------------------------------------------ def test_tc06_clean_finish_then_no_more_patches(spy_status, monkeypatch): monkeypatch.setattr(post_deploy, "probe_signals", _healthy) tid = _make_task("done") post_deploy.write_marker("orchestrator", "ORCH-061", post_deploy.ARMED, "armed") job = {"task_id": tid, "repo": "orchestrator", "id": 1, "agent": "post-deploy-monitor"} # Tick 1: budget==1, ticks==1 -> HEALTHY window exhausted -> finish. stage_engine.run_post_deploy_monitor(job) spy_status["set_issue_done"].assert_called_once_with("ORCH-061") assert post_deploy.has_marker("orchestrator", "ORCH-061", post_deploy.DONE) # No next tick was enqueued (window exhausted). assert _jobs() == [] # Tick 2 (e.g. duplicate job): DONE marker present -> no-op, ZERO new PATCHes. spy_status["set_issue_done"].reset_mock() stage_engine.run_post_deploy_monitor(job) spy_status["set_issue_done"].assert_not_called() spy_status["set_issue_monitoring"].assert_not_called() assert _jobs() == [] # --- TC-07 ------------------------------------------------------------------ def test_tc07_cancelled_mid_window_is_noop(spy_status, monkeypatch): monkeypatch.setattr(post_deploy, "probe_signals", _healthy) tid = _make_task("cancelled") post_deploy.write_marker("orchestrator", "ORCH-061", post_deploy.ARMED, "armed") job = {"task_id": tid, "repo": "orchestrator", "id": 1, "agent": "post-deploy-monitor"} stage_engine.run_post_deploy_monitor(job) # Zombie-tick guard: window closed, NO status PATCH, NO next tick. for name, m in spy_status.items(): m.assert_not_called() assert post_deploy.has_marker("orchestrator", "ORCH-061", post_deploy.DONE) assert _jobs() == [] def test_tc07_finished_window_is_noop(spy_status, monkeypatch): monkeypatch.setattr(post_deploy, "probe_signals", _healthy) tid = _make_task("done") # Window already finished (DONE marker present) -> no active basis to tick. post_deploy.write_marker("orchestrator", "ORCH-061", post_deploy.ARMED, "armed") post_deploy.mark_done("orchestrator", "ORCH-061") job = {"task_id": tid, "repo": "orchestrator", "id": 1, "agent": "post-deploy-monitor"} stage_engine.run_post_deploy_monitor(job) spy_status["set_issue_done"].assert_not_called() spy_status["set_issue_monitoring"].assert_not_called() assert _jobs() == [] # --- TC-08 ------------------------------------------------------------------ def test_tc08_arm_monitor_idempotent_no_rearm(monkeypatch): tid = _make_task("done") # First arm: writes ARMED + enqueues tick 1. assert post_deploy.arm_monitor("orchestrator", "ORCH-061", "feature/ORCH-061-x", tid) is True assert _jobs() == ["post-deploy-monitor"] # Second arm (re-drive deploy->done): ARMED present -> no-op, no new job. assert post_deploy.arm_monitor("orchestrator", "ORCH-061", "feature/ORCH-061-x", tid) is False assert _jobs() == ["post-deploy-monitor"] def test_tc08_redrive_after_window_closed_converges(spy_status, monkeypatch): # Guard ON, self-hosting. monkeypatch.setattr(cfg.settings, "deploy_status_guard_enabled", True, raising=False) monkeypatch.setattr(cfg.settings, "deploy_status_guard_repos", "", raising=False) _make_task("done") # Window armed then closed (a completed post-deploy observation). post_deploy.write_marker("orchestrator", "ORCH-061", post_deploy.ARMED, "armed") post_deploy.mark_done("orchestrator", "ORCH-061") # A stale re-drive calling the REAL guarded setter must converge to Done, not # resurrect Monitoring. (Use the real plane_sync setter via stage_engine import.) from src import plane_sync direct = MagicMock() done = MagicMock() monkeypatch.setattr(plane_sync, "_set_issue_state_direct", direct) monkeypatch.setattr(plane_sync, "set_issue_done", done) monkeypatch.setattr(plane_sync, "_resolve_project_id", lambda w=None, p=None: "proj-1") monkeypatch.setattr(plane_sync, "get_project_states", lambda pid: {"monitoring": "S-mon"}) plane_sync.set_issue_monitoring("ORCH-061", reason="advance:deploy->done") direct.assert_not_called() done.assert_called_once_with("ORCH-061")