Files
orchestrator/tests/test_deploy_restart_merge_recovery.py

117 lines
4.7 KiB
Python

"""ORCH-071 TC-10 (AC-3/G3) — merge survives a restart during Phase B (smoke).
Scenario: the prod container "dies" during Phase B BEFORE the feature PR is merged
(the holder of the merge step is gone). Because the merge runs in the
restart-surviving Phase C finalizer (deploy->done under-gate), a re-drive of the
finalizer in the NEW container catches the merge up: it merges the PR, the verifier
turns green and the task finally reaches ``done`` — never stuck without an alert and
never ``done`` without a confirmed merge.
The first finalizer pass models "died before merge": the merge-actor cannot complete
and the verifier is red -> HOLD + alert (task stays on ``deploy``). The second pass
models the re-drive after the restart: the merge lands, verify is green -> ``done``.
"""
import os
import tempfile
import pytest
_test_db = os.path.join(tempfile.gettempdir(), "test_orch_merge_recovery.db")
os.environ["ORCH_DB_PATH"] = _test_db
os.environ["ORCH_REPOS_DIR"] = tempfile.gettempdir()
os.environ.setdefault("ORCH_GITEA_TOKEN", "test-token")
os.environ.setdefault("ORCH_PLANE_API_TOKEN", "test-token")
from unittest.mock import MagicMock # noqa: E402
import src.db as _db # noqa: E402
from src.db import init_db, get_db # noqa: E402
from src import stage_engine # noqa: E402
from src import self_deploy # noqa: E402
@pytest.fixture(autouse=True)
def fresh_db(monkeypatch, tmp_path):
monkeypatch.setattr(_db.settings, "db_path", _test_db)
if os.path.exists(_test_db):
os.unlink(_test_db)
init_db()
monkeypatch.setattr(self_deploy.settings, "repos_dir", str(tmp_path))
monkeypatch.setattr(self_deploy.settings, "host_repos_dir", str(tmp_path))
monkeypatch.setattr(stage_engine.self_deploy, "write_deploy_log", MagicMock(return_value=True))
monkeypatch.setattr(stage_engine.merge_gate.settings, "merge_verify_enabled", True)
monkeypatch.setattr(stage_engine.merge_gate.settings, "merge_verify_repos", "")
monkeypatch.setattr(
stage_engine.self_deploy, "record_merged_to_main", MagicMock(return_value=True)
)
monkeypatch.setattr(stage_engine.post_deploy.settings, "post_deploy_monitor_enabled", False)
yield
@pytest.fixture(autouse=True)
def silence_side_effects(monkeypatch):
for name in (
"notify_stage_change", "notify_qg_failure", "notify_approve_requested",
"send_telegram", "plane_notify_stage", "plane_notify_qg", "plane_add_comment",
"set_issue_in_review", "set_issue_needs_input", "set_issue_in_progress",
"set_issue_blocked", "set_issue_done", "set_issue_analysis",
"set_issue_awaiting_deploy", "set_issue_deploying", "set_issue_monitoring",
):
monkeypatch.setattr(stage_engine, name, MagicMock())
monkeypatch.setattr(stage_engine.merge_gate, "release_merge_lease", MagicMock())
def _stage(task_id):
conn = get_db()
row = conn.execute("SELECT stage FROM tasks WHERE id=?", (task_id,)).fetchone()
conn.close()
return row[0]
def test_tc10_merge_recovers_after_restart(monkeypatch):
self_deploy.write_marker("orchestrator", "ORCH-071", self_deploy.RESULT, "0")
monkeypatch.setattr(
stage_engine, "QG_CHECKS",
{**stage_engine.QG_CHECKS, "check_deploy_status": lambda *a, **k: (True, "ok")},
)
# Stateful merge: the FIRST attempt (pre-restart) cannot complete; the SECOND
# (the re-driven finalizer after the restart) merges and the verifier goes green.
state = {"attempts": 0, "merged": False}
def fake_merge_pr(repo, branch):
state["attempts"] += 1
if state["attempts"] == 1:
return (False, "interrupted by restart")
state["merged"] = True
return (True, "merged PR #1")
def fake_verify(repo, branch, sha):
return state["merged"]
monkeypatch.setattr(stage_engine.merge_gate, "merge_pr", fake_merge_pr)
monkeypatch.setattr(stage_engine.merge_gate, "verify_merged_to_main", fake_verify)
conn = get_db()
cur = conn.execute(
"INSERT INTO tasks (plane_id, work_item_id, repo, branch, stage) VALUES (?, ?, ?, ?, ?)",
("plane-ORCH-071", "ORCH-071", "orchestrator", "feature/ORCH-071-x", "deploy"),
)
task_id = cur.lastrowid
conn.commit()
conn.close()
job = {"task_id": task_id, "repo": "orchestrator", "id": 1, "agent": "deploy-finalizer"}
# Pass 1 (process died before merge): HOLD — not done, alerted, Blocked.
stage_engine.run_deploy_finalizer(job)
assert _stage(task_id) == "deploy"
assert stage_engine.set_issue_blocked.called
assert not stage_engine.set_issue_done.called
# Pass 2 (finalizer re-driven after restart): merge lands, verify green -> done.
stage_engine.run_deploy_finalizer(job)
assert _stage(task_id) == "done"
assert state["merged"] is True