"""ORCH-071 TC-10 (AC-3/G3) — merge survives a restart during Phase B (smoke). Scenario: the prod container "dies" during Phase B BEFORE the feature PR is merged (the holder of the merge step is gone). Because the merge runs in the restart-surviving Phase C finalizer (deploy->done under-gate), a re-drive of the finalizer in the NEW container catches the merge up: it merges the PR, the verifier turns green and the task finally reaches ``done`` — never stuck without an alert and never ``done`` without a confirmed merge. The first finalizer pass models "died before merge": the merge-actor cannot complete and the verifier is red -> HOLD + alert (task stays on ``deploy``). The second pass models the re-drive after the restart: the merge lands, verify is green -> ``done``. """ import os import tempfile import pytest _test_db = os.path.join(tempfile.gettempdir(), "test_orch_merge_recovery.db") os.environ["ORCH_DB_PATH"] = _test_db os.environ["ORCH_REPOS_DIR"] = tempfile.gettempdir() os.environ.setdefault("ORCH_GITEA_TOKEN", "test-token") os.environ.setdefault("ORCH_PLANE_API_TOKEN", "test-token") from unittest.mock import MagicMock # noqa: E402 import src.db as _db # noqa: E402 from src.db import init_db, get_db # noqa: E402 from src import stage_engine # noqa: E402 from src import self_deploy # noqa: E402 @pytest.fixture(autouse=True) def fresh_db(monkeypatch, tmp_path): monkeypatch.setattr(_db.settings, "db_path", _test_db) if os.path.exists(_test_db): os.unlink(_test_db) init_db() monkeypatch.setattr(self_deploy.settings, "repos_dir", str(tmp_path)) monkeypatch.setattr(self_deploy.settings, "host_repos_dir", str(tmp_path)) monkeypatch.setattr(stage_engine.self_deploy, "write_deploy_log", MagicMock(return_value=True)) monkeypatch.setattr(stage_engine.merge_gate.settings, "merge_verify_enabled", True) monkeypatch.setattr(stage_engine.merge_gate.settings, "merge_verify_repos", "") monkeypatch.setattr( stage_engine.self_deploy, "record_merged_to_main", MagicMock(return_value=True) ) monkeypatch.setattr(stage_engine.post_deploy.settings, "post_deploy_monitor_enabled", False) yield @pytest.fixture(autouse=True) def silence_side_effects(monkeypatch): for name in ( "notify_stage_change", "notify_qg_failure", "notify_approve_requested", "send_telegram", "plane_notify_stage", "plane_notify_qg", "plane_add_comment", "set_issue_in_review", "set_issue_needs_input", "set_issue_in_progress", "set_issue_blocked", "set_issue_done", "set_issue_analysis", "set_issue_awaiting_deploy", "set_issue_deploying", "set_issue_monitoring", ): monkeypatch.setattr(stage_engine, name, MagicMock()) monkeypatch.setattr(stage_engine.merge_gate, "release_merge_lease", MagicMock()) def _stage(task_id): conn = get_db() row = conn.execute("SELECT stage FROM tasks WHERE id=?", (task_id,)).fetchone() conn.close() return row[0] def test_tc10_merge_recovers_after_restart(monkeypatch): self_deploy.write_marker("orchestrator", "ORCH-071", self_deploy.RESULT, "0") monkeypatch.setattr( stage_engine, "QG_CHECKS", {**stage_engine.QG_CHECKS, "check_deploy_status": lambda *a, **k: (True, "ok")}, ) # Stateful merge: the FIRST attempt (pre-restart) cannot complete; the SECOND # (the re-driven finalizer after the restart) merges and the verifier goes green. state = {"attempts": 0, "merged": False} def fake_merge_pr(repo, branch): state["attempts"] += 1 if state["attempts"] == 1: return (False, "interrupted by restart") state["merged"] = True return (True, "merged PR #1") def fake_verify(repo, branch, sha): return state["merged"] monkeypatch.setattr(stage_engine.merge_gate, "merge_pr", fake_merge_pr) monkeypatch.setattr(stage_engine.merge_gate, "verify_merged_to_main", fake_verify) conn = get_db() cur = conn.execute( "INSERT INTO tasks (plane_id, work_item_id, repo, branch, stage) VALUES (?, ?, ?, ?, ?)", ("plane-ORCH-071", "ORCH-071", "orchestrator", "feature/ORCH-071-x", "deploy"), ) task_id = cur.lastrowid conn.commit() conn.close() job = {"task_id": task_id, "repo": "orchestrator", "id": 1, "agent": "deploy-finalizer"} # Pass 1 (process died before merge): HOLD — not done, alerted, Blocked. stage_engine.run_deploy_finalizer(job) assert _stage(task_id) == "deploy" assert stage_engine.set_issue_blocked.called assert not stage_engine.set_issue_done.called # Pass 2 (finalizer re-driven after restart): merge lands, verify green -> done. stage_engine.run_deploy_finalizer(job) assert _stage(task_id) == "done" assert state["merged"] is True