103 lines
3.5 KiB
Python
103 lines
3.5 KiB
Python
"""ORCH-036 TC-12/13: no silent deploy — both Plane AND Telegram are notified (AC-6).
|
|
|
|
The finalizer (Phase C) must announce the prod-deploy outcome on BOTH channels:
|
|
* TC-12 — a SUCCESS deploy -> a Plane comment AND a Telegram message.
|
|
* TC-13 — a FAILED deploy (rollback) -> a Plane comment AND a Telegram message.
|
|
"""
|
|
|
|
import os
|
|
import tempfile
|
|
|
|
import pytest
|
|
|
|
_test_db = os.path.join(tempfile.gettempdir(), "test_orch_deploy_notif.db")
|
|
os.environ["ORCH_DB_PATH"] = _test_db
|
|
os.environ["ORCH_REPOS_DIR"] = tempfile.gettempdir()
|
|
os.environ.setdefault("ORCH_GITEA_TOKEN", "test-token")
|
|
os.environ.setdefault("ORCH_PLANE_API_TOKEN", "test-token")
|
|
|
|
from unittest.mock import MagicMock # noqa: E402
|
|
|
|
import src.db as _db # noqa: E402
|
|
from src.db import init_db, get_db # noqa: E402
|
|
from src import stage_engine # noqa: E402
|
|
from src import self_deploy # noqa: E402
|
|
|
|
|
|
@pytest.fixture(autouse=True)
|
|
def fresh_db(monkeypatch, tmp_path):
|
|
monkeypatch.setattr(_db.settings, "db_path", _test_db)
|
|
if os.path.exists(_test_db):
|
|
os.unlink(_test_db)
|
|
init_db()
|
|
monkeypatch.setattr(self_deploy.settings, "repos_dir", str(tmp_path))
|
|
monkeypatch.setattr(self_deploy.settings, "host_repos_dir", str(tmp_path))
|
|
monkeypatch.setattr(stage_engine.self_deploy, "write_deploy_log", MagicMock(return_value=True))
|
|
monkeypatch.setattr(stage_engine.merge_gate, "release_merge_lease", MagicMock())
|
|
yield
|
|
|
|
|
|
@pytest.fixture(autouse=True)
|
|
def silence_side_effects(monkeypatch):
|
|
for name in (
|
|
"notify_stage_change", "notify_qg_failure", "notify_approve_requested",
|
|
"send_telegram", "plane_notify_stage", "plane_notify_qg", "plane_add_comment",
|
|
"set_issue_in_review", "set_issue_needs_input", "set_issue_in_progress",
|
|
"set_issue_blocked", "set_issue_done",
|
|
):
|
|
monkeypatch.setattr(stage_engine, name, MagicMock())
|
|
|
|
|
|
def _make_task(stage, repo="orchestrator", branch="feature/ORCH-036-x", wi="ORCH-036"):
|
|
conn = get_db()
|
|
cur = conn.execute(
|
|
"INSERT INTO tasks (plane_id, work_item_id, repo, branch, stage) "
|
|
"VALUES (?, ?, ?, ?, ?)",
|
|
(f"plane-{wi}", wi, repo, branch, stage),
|
|
)
|
|
task_id = cur.lastrowid
|
|
conn.commit()
|
|
conn.close()
|
|
return task_id
|
|
|
|
|
|
def _pass(*a, **k):
|
|
return (True, "ok")
|
|
|
|
|
|
def _fail(reason):
|
|
def _f(*a, **k):
|
|
return (False, reason)
|
|
return _f
|
|
|
|
|
|
def _run_finalizer(task_id):
|
|
stage_engine.run_deploy_finalizer(
|
|
{"task_id": task_id, "repo": "orchestrator", "id": 1, "agent": "deploy-finalizer"}
|
|
)
|
|
|
|
|
|
def test_tc12_success_notifies_plane_and_telegram(monkeypatch):
|
|
self_deploy.write_marker("orchestrator", "ORCH-036", self_deploy.RESULT, "0")
|
|
monkeypatch.setattr(
|
|
stage_engine, "QG_CHECKS",
|
|
{**stage_engine.QG_CHECKS, "check_deploy_status": _pass},
|
|
)
|
|
task_id = _make_task("deploy")
|
|
_run_finalizer(task_id)
|
|
assert stage_engine.plane_add_comment.called
|
|
assert stage_engine.send_telegram.called
|
|
|
|
|
|
def test_tc13_rollback_notifies_plane_and_telegram(monkeypatch):
|
|
self_deploy.write_marker("orchestrator", "ORCH-036", self_deploy.RESULT, "1")
|
|
monkeypatch.setattr(
|
|
stage_engine, "QG_CHECKS",
|
|
{**stage_engine.QG_CHECKS, "check_deploy_status": _fail("Deploy status: FAILED")},
|
|
)
|
|
task_id = _make_task("deploy")
|
|
_run_finalizer(task_id)
|
|
# The БАГ-8 rollback path announces on both channels (no silent failure).
|
|
assert stage_engine.send_telegram.called
|
|
assert stage_engine.plane_add_comment.called or stage_engine.plane_notify_qg.called
|