Files
orchestrator/tests/test_post_deploy_integration.py
claude-bot 0dfddf93f0 feat(plane): осмысленная статусная модель Plane (слой B — индикация)
Приводит статусы доски Plane к смыслу стадий конвейера, сохраняя
инвариант «статус — индикация, а не управление». Меняется только слой B
(отображение: src/plane_sync.py + точки выставления статуса в
stage_engine.py/webhooks/plane.py/reconciler.py); слой A — машина стадий
src/stages.py::STAGE_TRANSITIONS — остаётся байт-в-байт неизменным (AC-21).

- 6 новых логических ключей статуса (to_analyse, analysis, code_review,
  awaiting_deploy, deploying, monitoring) + сеттеры и диспетчер
  set_issue_stage_state.
- Project-relative alias-fallback (BR-12): новый ключ деградирует на
  базовый UUID того же проекта → нулевая регрессия для enduro-trails.
- Самодеплой (ORCH-036) индицирует фазы: Awaiting Deploy / Deploying;
  terminal-sync для self-hosting → Monitoring after Deploy, для прочих →
  терминальный Done.
- Post-deploy монитор (ORCH-021): HEALTHY → Done, DEGRADED → Blocked
  (только индикация; self-hosting ALERT_ONLY, прод не трогается, BR-5).
- Reconciler: триггер старта/резюма на To Analyse; Guard 2 учитывает
  новые активные ожидания без расширения skip-set на алиасах.
- never-raise контракт сеттеров и резолвера состояний сохранён.
- Раскатка — созданием статусов в Plane оператором, без kill-switch.

Инварианты не менялись: STAGE_TRANSITIONS, QG_CHECKS (12 чеков),
check_deploy_status, exit-код-контракт хука, merge-gate, схема БД.

ADR: docs/work-items/ORCH-066/06-adr/ADR-001-plane-status-model.md
Тесты: test_plane_status_model, test_plane_to_analyse_resume,
test_plane_status_failclosed + TC в существующих наборах. 774 passed.

Refs: ORCH-066

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-06-07 22:02:45 +00:00

338 lines
16 KiB
Python

"""ORCH-021 integration tests — arming + tick orchestration (TC-16..TC-20).
Exercises the wiring in ``stage_engine`` (arm on deploy->done,
``run_post_deploy_monitor`` tick + reaction) and the ``/queue`` observability
block, with the network probe and the rollback hook mocked. Mirrors the
test_deploy_terminal_sync.py harness.
"""
import os
import tempfile
import pytest
_test_db = os.path.join(tempfile.gettempdir(), "test_orch_post_deploy.db")
os.environ["ORCH_DB_PATH"] = _test_db
os.environ["ORCH_REPOS_DIR"] = tempfile.gettempdir()
os.environ.setdefault("ORCH_GITEA_TOKEN", "test-token")
os.environ.setdefault("ORCH_PLANE_API_TOKEN", "test-token")
from unittest.mock import MagicMock # noqa: E402
import src.db as _db # noqa: E402
from src.db import init_db, get_db # noqa: E402
from src import stage_engine # noqa: E402
from src import post_deploy # noqa: E402
@pytest.fixture(autouse=True)
def fresh_db(monkeypatch, tmp_path):
monkeypatch.setattr(_db.settings, "db_path", _test_db)
if os.path.exists(_test_db):
os.unlink(_test_db)
init_db()
# State sentinels live under the tmp repos_dir (container view).
monkeypatch.setattr(post_deploy.settings, "repos_dir", str(tmp_path))
monkeypatch.setattr(post_deploy.settings, "host_repos_dir", str(tmp_path))
monkeypatch.setattr(stage_engine.settings, "repos_dir", str(tmp_path))
# The artefact write is best-effort; stub it so no worktree is needed.
monkeypatch.setattr(post_deploy, "write_post_deploy_log", MagicMock(return_value=True))
yield
@pytest.fixture(autouse=True)
def silence_side_effects(monkeypatch):
for name in (
"notify_stage_change", "notify_qg_failure", "notify_approve_requested",
"send_telegram", "plane_notify_stage", "plane_notify_qg", "plane_add_comment",
"set_issue_in_review", "set_issue_needs_input", "set_issue_in_progress",
"set_issue_blocked", "set_issue_done",
# ORCH-066 status setters.
"set_issue_analysis", "set_issue_awaiting_deploy", "set_issue_deploying",
"set_issue_monitoring",
):
monkeypatch.setattr(stage_engine, name, MagicMock())
def _make_task(stage, repo="orchestrator", branch="feature/ORCH-021-x", wi="ORCH-021"):
conn = get_db()
cur = conn.execute(
"INSERT INTO tasks (plane_id, work_item_id, repo, branch, stage) "
"VALUES (?, ?, ?, ?, ?)",
(f"plane-{wi}", wi, repo, branch, stage),
)
task_id = cur.lastrowid
conn.commit()
conn.close()
return task_id
def _jobs(agent=None):
conn = get_db()
if agent:
rows = conn.execute(
"SELECT agent FROM jobs WHERE agent=? ORDER BY id", (agent,)
).fetchall()
else:
rows = conn.execute("SELECT agent FROM jobs ORDER BY id").fetchall()
conn.close()
return [r[0] for r in rows]
def _pass(*a, **k):
return (True, "ok")
def _drive_deploy_to_done(monkeypatch, task_id, repo="orchestrator",
branch="feature/ORCH-021-x", wi="ORCH-021"):
"""Advance a deploy-stage task to done through the real terminal block."""
monkeypatch.setattr(
stage_engine, "QG_CHECKS",
{**stage_engine.QG_CHECKS, "check_deploy_status": _pass},
)
monkeypatch.setattr(stage_engine.merge_gate, "release_merge_lease", MagicMock())
return stage_engine.advance_stage(
task_id=task_id, current_stage="deploy", repo=repo,
work_item_id=wi, branch=branch, finished_agent="deployer",
)
# ---------------------------------------------------------------------------
# TC-16 — arm on deploy->done (applicable repo only)
# ---------------------------------------------------------------------------
def test_tc16_arm_for_self_hosting(monkeypatch):
monkeypatch.setattr(post_deploy.settings, "post_deploy_monitor_enabled", True)
monkeypatch.setattr(post_deploy.settings, "post_deploy_repos", "")
task_id = _make_task("deploy")
_drive_deploy_to_done(monkeypatch, task_id)
assert post_deploy.has_marker("orchestrator", "ORCH-021", post_deploy.ARMED)
assert "post-deploy-monitor" in _jobs("post-deploy-monitor")
def test_tc16_no_arm_for_nonself(monkeypatch):
monkeypatch.setattr(post_deploy.settings, "post_deploy_monitor_enabled", True)
monkeypatch.setattr(post_deploy.settings, "post_deploy_repos", "")
task_id = _make_task("deploy", repo="enduro-trails", branch="feature/ET-9", wi="ET-9")
_drive_deploy_to_done(monkeypatch, task_id, repo="enduro-trails",
branch="feature/ET-9", wi="ET-9")
assert not post_deploy.has_marker("enduro-trails", "ET-9", post_deploy.ARMED)
assert _jobs("post-deploy-monitor") == []
def test_tc16_no_arm_when_kill_switch_off(monkeypatch):
monkeypatch.setattr(post_deploy.settings, "post_deploy_monitor_enabled", False)
task_id = _make_task("deploy")
_drive_deploy_to_done(monkeypatch, task_id)
assert not post_deploy.has_marker("orchestrator", "ORCH-021", post_deploy.ARMED)
assert _jobs("post-deploy-monitor") == []
# ---------------------------------------------------------------------------
# TC-17 — idempotent arm (double webhook)
# ---------------------------------------------------------------------------
def test_tc17_double_arm_is_noop(monkeypatch):
monkeypatch.setattr(post_deploy.settings, "post_deploy_monitor_enabled", True)
armed1 = post_deploy.arm_monitor("orchestrator", "ORCH-021", "feature/ORCH-021-x", 1)
armed2 = post_deploy.arm_monitor("orchestrator", "ORCH-021", "feature/ORCH-021-x", 1)
assert armed1 is True
assert armed2 is False
# Exactly ONE monitor job enqueued despite two arm calls.
assert _jobs("post-deploy-monitor") == ["post-deploy-monitor"]
# ---------------------------------------------------------------------------
# TC-18 — DEGRADED -> non-self auto-rollback (hook mocked)
# ---------------------------------------------------------------------------
def test_tc18_degraded_nonself_rolls_back(monkeypatch):
monkeypatch.setattr(post_deploy.settings, "post_deploy_monitor_enabled", True)
monkeypatch.setattr(post_deploy.settings, "post_deploy_repos", "enduro-trails")
monkeypatch.setattr(post_deploy.settings, "post_deploy_auto_rollback", True)
monkeypatch.setattr(post_deploy.settings, "post_deploy_fail_threshold", 1)
monkeypatch.setattr(post_deploy.settings, "post_deploy_window_s", 30)
monkeypatch.setattr(post_deploy.settings, "post_deploy_interval_s", 30) # budget=1 tick
# Probe reports unhealthy.
monkeypatch.setattr(
post_deploy, "probe_signals",
lambda url: post_deploy.ProbeResult(False, 2, 2, "down"),
)
rollback = MagicMock(return_value=(0, "ok"))
monkeypatch.setattr(post_deploy, "run_rollback", rollback)
notify = MagicMock()
monkeypatch.setattr(stage_engine, "_notify_post_deploy", notify)
logspy = MagicMock(return_value=True)
monkeypatch.setattr(post_deploy, "write_post_deploy_log", logspy)
task_id = _make_task("done", repo="enduro-trails", branch="feature/ET-9", wi="ET-9")
post_deploy.write_marker("enduro-trails", "ET-9", post_deploy.ARMED, "armed")
stage_engine.run_post_deploy_monitor(
{"task_id": task_id, "repo": "enduro-trails", "id": 1, "agent": "post-deploy-monitor"}
)
rollback.assert_called_once_with("enduro-trails")
assert post_deploy.has_marker("enduro-trails", "ET-9", post_deploy.DONE)
# Artefact written with ROLLBACK_OK; a notification was sent.
args = logspy.call_args[0]
assert "DEGRADED" in args
assert "ROLLBACK_OK" in args
assert notify.called
# ---------------------------------------------------------------------------
# TC-19 — self-hosting DEGRADED never rolls back, alerts instead
# ---------------------------------------------------------------------------
def test_tc19_degraded_self_hosting_alert_only(monkeypatch):
monkeypatch.setattr(post_deploy.settings, "post_deploy_monitor_enabled", True)
monkeypatch.setattr(post_deploy.settings, "post_deploy_auto_rollback", True)
monkeypatch.setattr(post_deploy.settings, "post_deploy_fail_threshold", 1)
monkeypatch.setattr(post_deploy.settings, "post_deploy_window_s", 30)
monkeypatch.setattr(post_deploy.settings, "post_deploy_interval_s", 30)
monkeypatch.setattr(
post_deploy, "probe_signals",
lambda url: post_deploy.ProbeResult(False, 2, 2, "down"),
)
# Rollback hook MUST NOT be called for self-hosting (AC-8 structural invariant).
rollback = MagicMock(return_value=(0, "ok"))
monkeypatch.setattr(post_deploy, "run_rollback", rollback)
notify = MagicMock()
monkeypatch.setattr(stage_engine, "_notify_post_deploy", notify)
logspy = MagicMock(return_value=True)
monkeypatch.setattr(post_deploy, "write_post_deploy_log", logspy)
task_id = _make_task("done")
post_deploy.write_marker("orchestrator", "ORCH-021", post_deploy.ARMED, "armed")
stage_engine.run_post_deploy_monitor(
{"task_id": task_id, "repo": "orchestrator", "id": 1, "agent": "post-deploy-monitor"}
)
rollback.assert_not_called()
assert post_deploy.has_marker("orchestrator", "ORCH-021", post_deploy.DONE)
args = logspy.call_args[0]
assert "DEGRADED" in args
assert "ALERT_ONLY" in args
assert notify.called
def test_healthy_tick_requeues_without_finishing(monkeypatch):
# HEALTHY and window not exhausted -> re-queue, do NOT mark done.
monkeypatch.setattr(post_deploy.settings, "post_deploy_monitor_enabled", True)
monkeypatch.setattr(post_deploy.settings, "post_deploy_window_s", 90)
monkeypatch.setattr(post_deploy.settings, "post_deploy_interval_s", 30) # budget=3
monkeypatch.setattr(
post_deploy, "probe_signals",
lambda url: post_deploy.ProbeResult(True, 2, 0, "ok"),
)
task_id = _make_task("done")
post_deploy.write_marker("orchestrator", "ORCH-021", post_deploy.ARMED, "armed")
stage_engine.run_post_deploy_monitor(
{"task_id": task_id, "repo": "orchestrator", "id": 1, "agent": "post-deploy-monitor"}
)
assert not post_deploy.has_marker("orchestrator", "ORCH-021", post_deploy.DONE)
# A follow-up tick job was enqueued.
assert _jobs("post-deploy-monitor") == ["post-deploy-monitor"]
def test_finished_window_tick_is_noop(monkeypatch):
# AC-15: a tick after the window is done is a no-op (no new job, no re-probe).
probe = MagicMock()
monkeypatch.setattr(post_deploy, "probe_signals", probe)
task_id = _make_task("done")
post_deploy.mark_done("orchestrator", "ORCH-021")
stage_engine.run_post_deploy_monitor(
{"task_id": task_id, "repo": "orchestrator", "id": 9, "agent": "post-deploy-monitor"}
)
probe.assert_not_called()
# ---------------------------------------------------------------------------
# ORCH-066 TC-10 (AC-10): HEALTHY + window exhausted -> Plane state Done.
# ---------------------------------------------------------------------------
def test_orch066_tc10_clean_window_close_sets_done(monkeypatch):
monkeypatch.setattr(post_deploy.settings, "post_deploy_monitor_enabled", True)
monkeypatch.setattr(post_deploy.settings, "post_deploy_window_s", 30)
monkeypatch.setattr(post_deploy.settings, "post_deploy_interval_s", 30) # budget=1
monkeypatch.setattr(
post_deploy, "probe_signals",
lambda url: post_deploy.ProbeResult(True, 2, 0, "ok"),
)
task_id = _make_task("done")
post_deploy.write_marker("orchestrator", "ORCH-021", post_deploy.ARMED, "armed")
stage_engine.run_post_deploy_monitor(
{"task_id": task_id, "repo": "orchestrator", "id": 1, "agent": "post-deploy-monitor"}
)
# Clean window close -> terminal Done indicated on Plane; window marked done.
stage_engine.set_issue_done.assert_called_once_with("ORCH-021")
stage_engine.set_issue_blocked.assert_not_called()
assert post_deploy.has_marker("orchestrator", "ORCH-021", post_deploy.DONE)
# No follow-up tick once the window closed.
assert _jobs("post-deploy-monitor") == []
# ---------------------------------------------------------------------------
# ORCH-066 TC-11 (AC-11): DEGRADED -> Plane state Blocked (self-hosting alert).
# ---------------------------------------------------------------------------
def test_orch066_tc11_degraded_sets_blocked(monkeypatch):
monkeypatch.setattr(post_deploy.settings, "post_deploy_monitor_enabled", True)
monkeypatch.setattr(post_deploy.settings, "post_deploy_fail_threshold", 1)
monkeypatch.setattr(post_deploy.settings, "post_deploy_window_s", 30)
monkeypatch.setattr(post_deploy.settings, "post_deploy_interval_s", 30)
monkeypatch.setattr(
post_deploy, "probe_signals",
lambda url: post_deploy.ProbeResult(False, 2, 2, "down"),
)
monkeypatch.setattr(stage_engine, "_notify_post_deploy", MagicMock())
task_id = _make_task("done")
post_deploy.write_marker("orchestrator", "ORCH-021", post_deploy.ARMED, "armed")
stage_engine.run_post_deploy_monitor(
{"task_id": task_id, "repo": "orchestrator", "id": 1, "agent": "post-deploy-monitor"}
)
# DEGRADED -> Blocked indication (NOT Done); window finalised.
stage_engine.set_issue_blocked.assert_called_once_with("ORCH-021")
stage_engine.set_issue_done.assert_not_called()
assert post_deploy.has_marker("orchestrator", "ORCH-021", post_deploy.DONE)
# ---------------------------------------------------------------------------
# ORCH-066 TC-12 (AC-12): a self-hosting tick NEVER restarts/rolls back prod —
# the Blocked indication is the ONLY mutation (ORCH-021 BR-5 preserved).
# ---------------------------------------------------------------------------
def test_orch066_tc12_self_tick_never_restarts_prod(monkeypatch):
monkeypatch.setattr(post_deploy.settings, "post_deploy_monitor_enabled", True)
monkeypatch.setattr(post_deploy.settings, "post_deploy_auto_rollback", True)
monkeypatch.setattr(post_deploy.settings, "post_deploy_fail_threshold", 1)
monkeypatch.setattr(post_deploy.settings, "post_deploy_window_s", 30)
monkeypatch.setattr(post_deploy.settings, "post_deploy_interval_s", 30)
monkeypatch.setattr(
post_deploy, "probe_signals",
lambda url: post_deploy.ProbeResult(False, 2, 2, "down"),
)
monkeypatch.setattr(stage_engine, "_notify_post_deploy", MagicMock())
# The rollback hook (the only restart-capable path) MUST stay untouched for self.
rollback = MagicMock(return_value=(0, "ok"))
monkeypatch.setattr(post_deploy, "run_rollback", rollback)
task_id = _make_task("done")
post_deploy.write_marker("orchestrator", "ORCH-021", post_deploy.ARMED, "armed")
stage_engine.run_post_deploy_monitor(
{"task_id": task_id, "repo": "orchestrator", "id": 1, "agent": "post-deploy-monitor"}
)
rollback.assert_not_called() # never restarts/rolls back the prod self-container
stage_engine.set_issue_blocked.assert_called_once_with("ORCH-021") # indication only
# ---------------------------------------------------------------------------
# TC-20 — /queue observability block
# ---------------------------------------------------------------------------
def test_tc20_queue_block_present(monkeypatch):
monkeypatch.setattr(post_deploy.settings, "post_deploy_monitor_enabled", True)
post_deploy.write_marker("orchestrator", "ORCH-021", post_deploy.ARMED, "armed")
snap = post_deploy.status()
assert snap["enabled"] is True
assert snap["window_s"] == post_deploy.settings.post_deploy_window_s
assert "ORCH-021" in snap["active"]
assert snap["active_count"] >= 1
# A finished window drops out of "active".
post_deploy.mark_done("orchestrator", "ORCH-021")
snap2 = post_deploy.status()
assert "ORCH-021" not in snap2["active"]