developer(ET): auto-commit from developer run_id=192
Some checks failed
CI / test (push) Failing after 17s

This commit is contained in:
2026-06-06 20:01:07 +00:00
parent 1acbb1d28d
commit be980e2a44
18 changed files with 1690 additions and 2 deletions

View File

@@ -0,0 +1,160 @@
"""ORCH-036 TC-04/05/06: the manual-approve gate for the executable self-deploy.
Contract (AC-5, AC-12):
* TC-04 — ``deploy_require_manual_approve`` defaults to True in settings.
* TC-05 — flag true + NO human approve -> the prod hook is NEVER called; the
deploy-staging -> deploy edge only advances the STAGE and requests an approve
(Phase A). ``initiate_deploy`` / ssh subprocess must not be touched.
* TC-06 — flag true + a human Approved -> the prod hook is launched EXACTLY once
(Phase B), idempotent on a repeated Approved (the ``initiated`` marker guards).
"""
import os
import tempfile
import pytest
_test_db = os.path.join(tempfile.gettempdir(), "test_orch_deploy_approve.db")
os.environ["ORCH_DB_PATH"] = _test_db
os.environ["ORCH_REPOS_DIR"] = tempfile.gettempdir()
os.environ.setdefault("ORCH_GITEA_TOKEN", "test-token")
os.environ.setdefault("ORCH_PLANE_API_TOKEN", "test-token")
from unittest.mock import MagicMock # noqa: E402
import src.db as _db # noqa: E402
from src.db import init_db, get_db # noqa: E402
from src import stage_engine # noqa: E402
from src import self_deploy # noqa: E402
from src.stage_engine import advance_stage # noqa: E402
@pytest.fixture(autouse=True)
def fresh_db(monkeypatch, tmp_path):
monkeypatch.setattr(_db.settings, "db_path", _test_db)
if os.path.exists(_test_db):
os.unlink(_test_db)
init_db()
# Isolate the sentinel state dirs to a per-test tmp dir.
monkeypatch.setattr(self_deploy.settings, "repos_dir", str(tmp_path))
monkeypatch.setattr(self_deploy.settings, "host_repos_dir", str(tmp_path))
yield
@pytest.fixture(autouse=True)
def silence_side_effects(monkeypatch):
for name in (
"notify_stage_change", "notify_qg_failure", "notify_approve_requested",
"send_telegram", "plane_notify_stage", "plane_notify_qg", "plane_add_comment",
"set_issue_in_review", "set_issue_needs_input", "set_issue_in_progress",
"set_issue_blocked", "set_issue_done",
):
monkeypatch.setattr(stage_engine, name, MagicMock())
def _make_task(stage, repo="orchestrator", branch="feature/ORCH-036-x", wi="ORCH-036"):
conn = get_db()
cur = conn.execute(
"INSERT INTO tasks (plane_id, work_item_id, repo, branch, stage) "
"VALUES (?, ?, ?, ?, ?)",
(f"plane-{wi}", wi, repo, branch, stage),
)
task_id = cur.lastrowid
conn.commit()
conn.close()
return task_id
def _stage(task_id):
conn = get_db()
row = conn.execute("SELECT stage FROM tasks WHERE id=?", (task_id,)).fetchone()
conn.close()
return row[0]
def _jobs():
conn = get_db()
rows = conn.execute("SELECT agent, repo, task_id FROM jobs ORDER BY id").fetchall()
conn.close()
return [dict(r) for r in rows]
def _pass(*a, **k):
return (True, "ok")
# ---------------------------------------------------------------------------
# TC-04: default flag value
# ---------------------------------------------------------------------------
def test_tc04_manual_approve_default_true():
"""The fresh, un-overridden settings default must be True (safe-by-default)."""
from src.config import Settings
assert Settings().deploy_require_manual_approve is True
# ---------------------------------------------------------------------------
# TC-05: flag true, no approve -> prod hook NOT called (Phase A only)
# ---------------------------------------------------------------------------
def test_tc05_no_approve_does_not_call_prod_hook(monkeypatch):
monkeypatch.setattr(stage_engine.settings, "deploy_require_manual_approve", True)
monkeypatch.setattr(
stage_engine, "QG_CHECKS",
{**stage_engine.QG_CHECKS,
"check_staging_status": _pass,
"check_branch_mergeable": _pass},
)
# Spy: the deploy launcher must never run on the staging->deploy edge.
initiate = MagicMock()
monkeypatch.setattr(stage_engine.self_deploy, "initiate_deploy", initiate)
ssh_run = MagicMock()
monkeypatch.setattr(self_deploy.subprocess, "run", ssh_run)
task_id = _make_task("deploy-staging")
res = advance_stage(
task_id, "deploy-staging", "orchestrator", "ORCH-036",
"feature/ORCH-036-x", finished_agent="deployer",
)
# Phase A: advanced the STAGE to deploy, but requested approve — no prod hook.
assert res.advanced is True
assert res.to_stage == "deploy"
assert _stage(task_id) == "deploy"
assert res.note == "self-deploy-approval-pending"
initiate.assert_not_called()
ssh_run.assert_not_called()
# No deployer job: the human Approved (Phase B) is what triggers the deploy.
assert _jobs() == []
# The restart-safe approve-requested marker was written.
assert self_deploy.has_marker("orchestrator", "ORCH-036", self_deploy.APPROVE_REQUESTED)
# ---------------------------------------------------------------------------
# TC-06: flag true + Approved -> prod hook called exactly once (idempotent)
# ---------------------------------------------------------------------------
def test_tc06_approved_calls_prod_hook_exactly_once(monkeypatch):
monkeypatch.setattr(stage_engine.settings, "deploy_require_manual_approve", True)
monkeypatch.setattr(stage_engine.settings, "deploy_ssh_host", "mva154")
# Real initiate_deploy, but the ssh subprocess is mocked (rc=0 -> dispatched).
ssh_run = MagicMock(return_value=MagicMock(returncode=0, stdout="", stderr=""))
monkeypatch.setattr(self_deploy.subprocess, "run", ssh_run)
task_id = _make_task("deploy") # already on deploy, awaiting Approved
# 1st human Approved -> Phase B initiates the detached deploy.
res1 = advance_stage(
task_id, "deploy", "orchestrator", "ORCH-036",
"feature/ORCH-036-x", finished_agent=None,
)
assert res1.note == "self-deploy-initiated"
assert ssh_run.call_count == 1
# The finalizer was enqueued.
assert any(j["agent"] == "deploy-finalizer" for j in _jobs())
assert self_deploy.has_marker("orchestrator", "ORCH-036", self_deploy.INITIATED)
# 2nd (duplicate) Approved -> idempotent no-op, hook NOT called again.
res2 = advance_stage(
task_id, "deploy", "orchestrator", "ORCH-036",
"feature/ORCH-036-x", finished_agent=None,
)
assert res2.note == "self-deploy-already-initiated"
assert ssh_run.call_count == 1 # still exactly one prod deploy

View File

@@ -0,0 +1,47 @@
"""ORCH-036 TC-14: prod deploy is build-ONCE — retag the staging image, no rebuild (AC-7).
The detached prod-deploy command must pass ``SOURCE_IMAGE=<staging-image>`` to the
hook so it retags the staging-validated image onto the prod tag instead of running
``docker build``. We assert the composed ssh command carries the staging source
image and never asks the hook to build.
"""
import os
os.environ.setdefault("ORCH_PLANE_API_TOKEN", "test-token")
os.environ.setdefault("ORCH_GITEA_TOKEN", "test-token")
from src import self_deploy # noqa: E402
def test_tc14_deploy_command_retags_staging_image_no_build(monkeypatch):
monkeypatch.setattr(self_deploy.settings, "deploy_ssh_user", "slin")
monkeypatch.setattr(self_deploy.settings, "deploy_ssh_host", "mva154")
monkeypatch.setattr(
self_deploy.settings, "deploy_prod_source_image", "orchestrator-orchestrator-staging"
)
cmd = self_deploy.build_deploy_command("orchestrator", "ORCH-036", "feature/ORCH-036-x")
remote = cmd[-1]
# The prevalidated staging image is handed to the hook as SOURCE_IMAGE (build-once).
assert "SOURCE_IMAGE=orchestrator-orchestrator-staging" in remote
# No rebuild is requested in the remote command.
assert "docker build" not in remote
assert "--build" not in remote
def test_tc14_hook_retag_branch_present():
"""The hook itself must honour SOURCE_IMAGE by retagging (no rebuild)."""
import pathlib
hook = pathlib.Path(__file__).resolve().parents[1] / "scripts" / "orchestrator-deploy-hook.sh"
text = hook.read_text(encoding="utf-8")
assert 'SOURCE_IMAGE="${SOURCE_IMAGE:-}"' in text
# Build-once retag branch present; the hook never runs `docker build`.
assert 'docker tag "$SOURCE_IMAGE" "$TARGET_IMAGE"' in text
# No EXECUTABLE `docker build` line (comments mentioning it are fine).
exec_lines = [
ln.strip() for ln in text.splitlines()
if ln.strip() and not ln.strip().startswith("#")
]
assert not any("docker build" in ln for ln in exec_lines)

View File

@@ -0,0 +1,47 @@
"""ORCH-036 TC-01/02/03: deterministic exit-code -> deploy_status mapping.
The finalizer (Phase C) maps the host-hook exit-code to the machine verdict via a
PURE function (no LLM, no I/O), so it is unit-testable in isolation. Contract
(hook exit-code 0/1/2, AC-1/AC-3): 0 -> SUCCESS; 1 (rolled back), 2 (rollback also
failed), and anything else -> FAILED (fail-closed).
"""
import os
os.environ.setdefault("ORCH_PLANE_API_TOKEN", "test-token")
os.environ.setdefault("ORCH_GITEA_TOKEN", "test-token")
from src.self_deploy import map_exit_code_to_status, build_deploy_log # noqa: E402
def test_tc01_exit0_maps_to_success():
assert map_exit_code_to_status(0) == "SUCCESS"
def test_tc02_exit1_rolled_back_maps_to_failed():
assert map_exit_code_to_status(1) == "FAILED"
def test_tc03_exit2_rollback_also_failed_maps_to_failed():
assert map_exit_code_to_status(2) == "FAILED"
def test_other_exit_codes_map_to_failed():
for code in (3, 127, 255, -1):
assert map_exit_code_to_status(code) == "FAILED"
def test_non_int_or_none_maps_to_failed_fail_closed():
assert map_exit_code_to_status(None) == "FAILED"
assert map_exit_code_to_status("garbage") == "FAILED"
def test_deploy_log_frontmatter_carries_status():
"""The rendered log must expose deploy_status in YAML frontmatter so the
existing _parse_deploy_status contract (AC-10) reads the right verdict."""
body_ok = build_deploy_log("ORCH-036", 0, "SUCCESS")
assert body_ok.startswith("---\n")
assert "deploy_status: SUCCESS" in body_ok
body_fail = build_deploy_log("ORCH-036", 2, "FAILED")
assert "deploy_status: FAILED" in body_fail
assert "hook_exit_code: 2" in body_fail

View File

@@ -0,0 +1,118 @@
"""ORCH-036 TC-19: deploy-hook auto-rollback simulation (AC-9).
Drives the REAL ``scripts/orchestrator-deploy-hook.sh`` in a hermetic sandbox:
``docker`` / ``curl`` / ``git`` / ``sleep`` are replaced by PATH-shimmed stubs so
no real infra is touched (and prod is never restarted — INFRA safety). The curl
stub is stateful: the freshly-deployed service is UNHEALTHY for the whole deploy
health-check window, which must trigger the hook's AUTO-ROLLBACK; after the
rollback restart the previous image is HEALTHY again.
Expected hook contract (exit-code 0/1/2):
* health fails -> auto rollback -> previous image healthy -> exit 1 (rolled back);
* the whole run completes well under the 60s MTTR budget (sleeps are shimmed).
"""
import os
import shutil
import stat
import subprocess
import time
import pytest
HOOK = os.path.join(
os.path.dirname(os.path.dirname(os.path.abspath(__file__))),
"scripts", "orchestrator-deploy-hook.sh",
)
pytestmark = pytest.mark.skipif(
shutil.which("bash") is None, reason="bash required for hook simulation"
)
def _write_exec(path, content):
with open(path, "w", encoding="utf-8") as f:
f.write(content)
os.chmod(path, os.stat(path).st_mode | stat.S_IEXEC | stat.S_IXGRP | stat.S_IXOTH)
def _setup_sandbox(tmp_path):
"""Create PATH-shimmed docker/curl/git/sleep stubs + a rewritten hook copy."""
binx = tmp_path / "bin"
binx.mkdir()
state = tmp_path / "state"
state.mkdir()
repo = tmp_path / "repo"
repo.mkdir()
cnt = state / "curl_count"
# docker: fake a running service + a recoverable previous image.
_write_exec(str(binx / "docker"), """#!/bin/bash
case "$1" in
compose)
for a in "$@"; do [ "$a" = "ps" ] && { echo "fakecid"; exit 0; }; done
exit 0;;
inspect) echo "sha256:previmage"; exit 0;;
image) exit 0;; # docker image inspect <img> -> found
tag) exit 0;;
*) exit 0;;
esac
""")
# curl: first 20 invocations (10 deploy health attempts x2 calls) UNHEALTHY,
# then HEALTHY (the rolled-back previous image).
_write_exec(str(binx / "curl"), f"""#!/bin/bash
CNT="{cnt}"
n=$(cat "$CNT" 2>/dev/null || echo 0); n=$((n+1)); echo "$n" > "$CNT"
iscode=""
for a in "$@"; do [ "$a" = "-w" ] && iscode=1; done
if [ "$n" -gt 20 ]; then
[ -n "$iscode" ] && echo "200" || echo '{{"status":"ok"}}'
else
[ -n "$iscode" ] && echo "000" || echo ""
fi
exit 0
""")
_write_exec(str(binx / "git"), "#!/bin/bash\nexit 0\n")
# Shim sleep to a no-op so the simulation runs fast (real timing is governed
# by the hook's sleep args; here we only assert the rollback CONTROL FLOW).
_write_exec(str(binx / "sleep"), "#!/bin/bash\nexit 0\n")
# Copy the hook, repointing REPO to the sandbox (avoids the hardcoded prod path).
hook_text = open(HOOK, encoding="utf-8").read()
hook_text = hook_text.replace(
"REPO=/home/slin/repos/orchestrator", f"REPO={repo}"
)
hook_copy = tmp_path / "hook.sh"
_write_exec(str(hook_copy), hook_text)
env = {
**os.environ,
"PATH": f"{binx}:{os.environ['PATH']}",
"LOG": str(state / "hook.log"),
"PREV_IMAGE_FILE": str(state / "prev-image"),
"COMPOSE_PROFILE": "staging",
"TARGET_SERVICE": "orchestrator-staging",
"TARGET_PORT": "8501",
}
return hook_copy, env
def test_tc19_unhealthy_deploy_auto_rolls_back_exit1(tmp_path):
hook_copy, env = _setup_sandbox(tmp_path)
t0 = time.time()
proc = subprocess.run(
["bash", str(hook_copy), "--deploy"],
env=env, capture_output=True, text=True, timeout=60,
)
elapsed = time.time() - t0
# AC-9: unhealthy deploy -> auto rollback succeeded on the previous image -> exit 1.
assert proc.returncode == 1, f"stdout={proc.stdout}\nstderr={proc.stderr}"
out = proc.stdout + proc.stderr
assert "AUTO ROLLBACK" in out
assert "rolled back to previous image successfully" in out
# MTTR well under the 60s budget (sleeps shimmed; control flow only).
assert elapsed < 60

View File

@@ -0,0 +1,102 @@
"""ORCH-036 TC-12/13: no silent deploy — both Plane AND Telegram are notified (AC-6).
The finalizer (Phase C) must announce the prod-deploy outcome on BOTH channels:
* TC-12 — a SUCCESS deploy -> a Plane comment AND a Telegram message.
* TC-13 — a FAILED deploy (rollback) -> a Plane comment AND a Telegram message.
"""
import os
import tempfile
import pytest
_test_db = os.path.join(tempfile.gettempdir(), "test_orch_deploy_notif.db")
os.environ["ORCH_DB_PATH"] = _test_db
os.environ["ORCH_REPOS_DIR"] = tempfile.gettempdir()
os.environ.setdefault("ORCH_GITEA_TOKEN", "test-token")
os.environ.setdefault("ORCH_PLANE_API_TOKEN", "test-token")
from unittest.mock import MagicMock # noqa: E402
import src.db as _db # noqa: E402
from src.db import init_db, get_db # noqa: E402
from src import stage_engine # noqa: E402
from src import self_deploy # noqa: E402
@pytest.fixture(autouse=True)
def fresh_db(monkeypatch, tmp_path):
monkeypatch.setattr(_db.settings, "db_path", _test_db)
if os.path.exists(_test_db):
os.unlink(_test_db)
init_db()
monkeypatch.setattr(self_deploy.settings, "repos_dir", str(tmp_path))
monkeypatch.setattr(self_deploy.settings, "host_repos_dir", str(tmp_path))
monkeypatch.setattr(stage_engine.self_deploy, "write_deploy_log", MagicMock(return_value=True))
monkeypatch.setattr(stage_engine.merge_gate, "release_merge_lease", MagicMock())
yield
@pytest.fixture(autouse=True)
def silence_side_effects(monkeypatch):
for name in (
"notify_stage_change", "notify_qg_failure", "notify_approve_requested",
"send_telegram", "plane_notify_stage", "plane_notify_qg", "plane_add_comment",
"set_issue_in_review", "set_issue_needs_input", "set_issue_in_progress",
"set_issue_blocked", "set_issue_done",
):
monkeypatch.setattr(stage_engine, name, MagicMock())
def _make_task(stage, repo="orchestrator", branch="feature/ORCH-036-x", wi="ORCH-036"):
conn = get_db()
cur = conn.execute(
"INSERT INTO tasks (plane_id, work_item_id, repo, branch, stage) "
"VALUES (?, ?, ?, ?, ?)",
(f"plane-{wi}", wi, repo, branch, stage),
)
task_id = cur.lastrowid
conn.commit()
conn.close()
return task_id
def _pass(*a, **k):
return (True, "ok")
def _fail(reason):
def _f(*a, **k):
return (False, reason)
return _f
def _run_finalizer(task_id):
stage_engine.run_deploy_finalizer(
{"task_id": task_id, "repo": "orchestrator", "id": 1, "agent": "deploy-finalizer"}
)
def test_tc12_success_notifies_plane_and_telegram(monkeypatch):
self_deploy.write_marker("orchestrator", "ORCH-036", self_deploy.RESULT, "0")
monkeypatch.setattr(
stage_engine, "QG_CHECKS",
{**stage_engine.QG_CHECKS, "check_deploy_status": _pass},
)
task_id = _make_task("deploy")
_run_finalizer(task_id)
assert stage_engine.plane_add_comment.called
assert stage_engine.send_telegram.called
def test_tc13_rollback_notifies_plane_and_telegram(monkeypatch):
self_deploy.write_marker("orchestrator", "ORCH-036", self_deploy.RESULT, "1")
monkeypatch.setattr(
stage_engine, "QG_CHECKS",
{**stage_engine.QG_CHECKS, "check_deploy_status": _fail("Deploy status: FAILED")},
)
task_id = _make_task("deploy")
_run_finalizer(task_id)
# The БАГ-8 rollback path announces on both channels (no silent failure).
assert stage_engine.send_telegram.called
assert stage_engine.plane_add_comment.called or stage_engine.plane_notify_qg.called

View File

@@ -0,0 +1,100 @@
"""ORCH-036 TC-10: a FAILED prod deploy rolls back deploy -> development (AC-4).
The finalizer (Phase C) reads the hook ``result`` sentinel, maps a non-zero exit
to ``deploy_status: FAILED`` and then drives the EXISTING deploy contract via
``advance_stage(finished_agent="deployer")``. With a FAILED verdict the БАГ-8
rollback fires: deploy -> development, ``set_issue_blocked`` + Telegram alert, and
(for the self-hosting repo) the merge-lease is released so the branch is not
wedged. The hook exit-code -> verdict mapping is unit-tested in
``test_deploy_hook_mapping.py``; here we assert the engine REACTION.
"""
import os
import tempfile
import pytest
_test_db = os.path.join(tempfile.gettempdir(), "test_orch_deploy_rollback.db")
os.environ["ORCH_DB_PATH"] = _test_db
os.environ["ORCH_REPOS_DIR"] = tempfile.gettempdir()
os.environ.setdefault("ORCH_GITEA_TOKEN", "test-token")
os.environ.setdefault("ORCH_PLANE_API_TOKEN", "test-token")
from unittest.mock import MagicMock # noqa: E402
import src.db as _db # noqa: E402
from src.db import init_db, get_db # noqa: E402
from src import stage_engine # noqa: E402
from src import self_deploy # noqa: E402
@pytest.fixture(autouse=True)
def fresh_db(monkeypatch, tmp_path):
monkeypatch.setattr(_db.settings, "db_path", _test_db)
if os.path.exists(_test_db):
os.unlink(_test_db)
init_db()
monkeypatch.setattr(self_deploy.settings, "repos_dir", str(tmp_path))
monkeypatch.setattr(self_deploy.settings, "host_repos_dir", str(tmp_path))
# The finalizer's deploy-log write touches a git worktree we don't have here;
# the verdict it drives comes from check_deploy_status (monkeypatched below).
monkeypatch.setattr(stage_engine.self_deploy, "write_deploy_log", MagicMock(return_value=True))
yield
@pytest.fixture(autouse=True)
def silence_side_effects(monkeypatch):
for name in (
"notify_stage_change", "notify_qg_failure", "notify_approve_requested",
"send_telegram", "plane_notify_stage", "plane_notify_qg", "plane_add_comment",
"set_issue_in_review", "set_issue_needs_input", "set_issue_in_progress",
"set_issue_blocked", "set_issue_done",
):
monkeypatch.setattr(stage_engine, name, MagicMock())
def _make_task(stage, repo="orchestrator", branch="feature/ORCH-036-x", wi="ORCH-036"):
conn = get_db()
cur = conn.execute(
"INSERT INTO tasks (plane_id, work_item_id, repo, branch, stage) "
"VALUES (?, ?, ?, ?, ?)",
(f"plane-{wi}", wi, repo, branch, stage),
)
task_id = cur.lastrowid
conn.commit()
conn.close()
return task_id
def _stage(task_id):
conn = get_db()
row = conn.execute("SELECT stage FROM tasks WHERE id=?", (task_id,)).fetchone()
conn.close()
return row[0]
def _fail(reason):
def _f(*a, **k):
return (False, reason)
return _f
def test_tc10_failed_deploy_rolls_back_to_development(monkeypatch):
# Hook reported exit 1 (rolled back) -> the host wrapper wrote result=1.
self_deploy.write_marker("orchestrator", "ORCH-036", self_deploy.RESULT, "1")
# The deploy-log verdict the gate reads is FAILED.
monkeypatch.setattr(
stage_engine, "QG_CHECKS",
{**stage_engine.QG_CHECKS, "check_deploy_status": _fail("Deploy status: FAILED")},
)
task_id = _make_task("deploy")
stage_engine.run_deploy_finalizer(
{"task_id": task_id, "repo": "orchestrator", "id": 1, "agent": "deploy-finalizer"}
)
# БАГ-8 rollback fired: NOT done, back on development, blocked + alerted.
assert _stage(task_id) == "development"
assert stage_engine.set_issue_blocked.called
assert stage_engine.send_telegram.called
assert stage_engine.set_issue_done.called is False

View File

@@ -0,0 +1,174 @@
"""ORCH-036 TC-07/08/09: self vs non-self deploy routing (AC-2, AC-11).
* TC-07 — ``is_self_hosting_repo``/``self_deploy_applies`` recognise the
orchestrator repo and reject any other (no regression).
* TC-08 — for the self repo the restart is launched as a DETACHED host process
(ssh + setsid + background), never synchronously inside the agent.
* TC-09 — for a non-self repo (enduro-trails) the deploy keeps the legacy path:
the self-deploy Phase A/B logic does NOT apply.
"""
import os
import tempfile
import pytest
_test_db = os.path.join(tempfile.gettempdir(), "test_orch_deploy_routing.db")
os.environ["ORCH_DB_PATH"] = _test_db
os.environ["ORCH_REPOS_DIR"] = tempfile.gettempdir()
os.environ.setdefault("ORCH_GITEA_TOKEN", "test-token")
os.environ.setdefault("ORCH_PLANE_API_TOKEN", "test-token")
from unittest.mock import MagicMock # noqa: E402
import src.db as _db # noqa: E402
from src.db import init_db, get_db # noqa: E402
from src import stage_engine # noqa: E402
from src import self_deploy # noqa: E402
from src.qg.checks import is_self_hosting_repo # noqa: E402
from src.stage_engine import advance_stage # noqa: E402
@pytest.fixture(autouse=True)
def fresh_db(monkeypatch, tmp_path):
monkeypatch.setattr(_db.settings, "db_path", _test_db)
if os.path.exists(_test_db):
os.unlink(_test_db)
init_db()
monkeypatch.setattr(self_deploy.settings, "repos_dir", str(tmp_path))
monkeypatch.setattr(self_deploy.settings, "host_repos_dir", str(tmp_path))
yield
@pytest.fixture(autouse=True)
def silence_side_effects(monkeypatch):
for name in (
"notify_stage_change", "notify_qg_failure", "notify_approve_requested",
"send_telegram", "plane_notify_stage", "plane_notify_qg", "plane_add_comment",
"set_issue_in_review", "set_issue_needs_input", "set_issue_in_progress",
"set_issue_blocked", "set_issue_done",
):
monkeypatch.setattr(stage_engine, name, MagicMock())
def _make_task(stage, repo, branch, wi):
conn = get_db()
cur = conn.execute(
"INSERT INTO tasks (plane_id, work_item_id, repo, branch, stage) "
"VALUES (?, ?, ?, ?, ?)",
(f"plane-{wi}", wi, repo, branch, stage),
)
task_id = cur.lastrowid
conn.commit()
conn.close()
return task_id
def _stage(task_id):
conn = get_db()
row = conn.execute("SELECT stage FROM tasks WHERE id=?", (task_id,)).fetchone()
conn.close()
return row[0]
def _jobs():
conn = get_db()
rows = conn.execute("SELECT agent, repo, task_id FROM jobs ORDER BY id").fetchall()
conn.close()
return [dict(r) for r in rows]
def _pass(*a, **k):
return (True, "ok")
# ---------------------------------------------------------------------------
# TC-07: routing predicate
# ---------------------------------------------------------------------------
def test_tc07_is_self_hosting_repo_only_orchestrator():
assert is_self_hosting_repo("orchestrator") is True
assert is_self_hosting_repo("ORCHESTRATOR") is True # case-insensitive
assert is_self_hosting_repo("enduro-trails") is False
assert is_self_hosting_repo("") is False
assert is_self_hosting_repo(None) is False
def test_tc07_self_deploy_applies_mirrors_routing(monkeypatch):
monkeypatch.setattr(self_deploy.settings, "self_deploy_enabled", True)
monkeypatch.setattr(self_deploy.settings, "self_deploy_repos", "")
assert self_deploy.self_deploy_applies("orchestrator") is True
assert self_deploy.self_deploy_applies("enduro-trails") is False
# Global kill-switch wins.
monkeypatch.setattr(self_deploy.settings, "self_deploy_enabled", False)
assert self_deploy.self_deploy_applies("orchestrator") is False
# ---------------------------------------------------------------------------
# TC-08: self repo -> DETACHED host process (ssh + setsid + background)
# ---------------------------------------------------------------------------
def test_tc08_self_repo_launches_detached_host_process(monkeypatch):
"""The deploy command must be an ssh invocation that detaches the hook via
setsid and backgrounds it (`&`), so it survives the prod container restart —
i.e. NOT a synchronous in-agent call."""
monkeypatch.setattr(self_deploy.settings, "deploy_ssh_user", "slin")
monkeypatch.setattr(self_deploy.settings, "deploy_ssh_host", "mva154")
cmd = self_deploy.build_deploy_command("orchestrator", "ORCH-036", "feature/ORCH-036-x")
assert cmd[0] == "ssh"
assert "slin@mva154" in cmd
remote = cmd[-1]
assert "setsid" in remote # detached session
assert remote.rstrip().endswith("&") # backgrounded
assert "</dev/null" in remote # stdin detached
assert "--deploy" in remote # runs the deploy hook
def test_tc08_initiate_deploy_uses_subprocess_not_blocking(monkeypatch):
"""initiate_deploy dispatches via subprocess (the ssh call returns at once);
a rc=0 means 'detached process launched', not 'deploy finished'."""
captured = {}
def fake_run(cmd, **kwargs):
captured["cmd"] = cmd
return MagicMock(returncode=0, stdout="", stderr="")
monkeypatch.setattr(self_deploy.settings, "deploy_ssh_host", "mva154")
monkeypatch.setattr(self_deploy.subprocess, "run", fake_run)
ok, msg = self_deploy.initiate_deploy("orchestrator", "ORCH-036", "feature/ORCH-036-x")
assert ok is True
assert captured["cmd"][0] == "ssh"
assert "detached" in msg
# ---------------------------------------------------------------------------
# TC-09: non-self repo -> legacy path, self-deploy logic does not apply
# ---------------------------------------------------------------------------
def test_tc09_non_self_repo_uses_legacy_path(monkeypatch):
"""enduro-trails on the deploy-staging -> deploy edge: no Phase A interception,
the deployer is enqueued for the deploy stage exactly as before ORCH-036."""
monkeypatch.setattr(stage_engine.settings, "deploy_require_manual_approve", True)
monkeypatch.setattr(
stage_engine, "QG_CHECKS",
{**stage_engine.QG_CHECKS, "check_staging_status": _pass},
) # check_branch_mergeable left REAL -> N/A for non-self repo
# Spy: self-deploy must not be initiated for a non-self repo.
initiate = MagicMock()
monkeypatch.setattr(stage_engine.self_deploy, "initiate_deploy", initiate)
task_id = _make_task("deploy-staging", "enduro-trails", "feature/ET-009-x", "ET-009")
res = advance_stage(
task_id, "deploy-staging", "enduro-trails", "ET-009",
"feature/ET-009-x", finished_agent="deployer",
)
assert res.advanced is True
assert _stage(task_id) == "deploy"
assert res.note != "self-deploy-approval-pending"
initiate.assert_not_called()
# Legacy path enqueues the deployer for the deploy stage.
jobs = _jobs()
assert len(jobs) == 1
assert jobs[0]["agent"] == "deployer"
# No self-deploy marker for the non-self repo.
assert not self_deploy.has_marker("enduro-trails", "ET-009", self_deploy.APPROVE_REQUESTED)

View File

@@ -0,0 +1,104 @@
"""ORCH-036 TC-17: a SUCCESS prod deploy preserves the terminal-sync contract (AC-10).
When the finalizer (Phase C) reads exit 0 -> ``deploy_status: SUCCESS`` and drives
``advance_stage(finished_agent="deployer")``, the EXISTING deploy->done transition
must still fire unchanged: stage becomes ``done``, ``set_issue_done`` is called, no
agent is launched, and the merge-lease is released (terminal-sync, ORCH-43/БАГ-8
contract). ORCH-036 only changes HOW the verdict is produced, never the contract.
"""
import os
import tempfile
import pytest
_test_db = os.path.join(tempfile.gettempdir(), "test_orch_deploy_terminal.db")
os.environ["ORCH_DB_PATH"] = _test_db
os.environ["ORCH_REPOS_DIR"] = tempfile.gettempdir()
os.environ.setdefault("ORCH_GITEA_TOKEN", "test-token")
os.environ.setdefault("ORCH_PLANE_API_TOKEN", "test-token")
from unittest.mock import MagicMock # noqa: E402
import src.db as _db # noqa: E402
from src.db import init_db, get_db # noqa: E402
from src import stage_engine # noqa: E402
from src import self_deploy # noqa: E402
@pytest.fixture(autouse=True)
def fresh_db(monkeypatch, tmp_path):
monkeypatch.setattr(_db.settings, "db_path", _test_db)
if os.path.exists(_test_db):
os.unlink(_test_db)
init_db()
monkeypatch.setattr(self_deploy.settings, "repos_dir", str(tmp_path))
monkeypatch.setattr(self_deploy.settings, "host_repos_dir", str(tmp_path))
monkeypatch.setattr(stage_engine.self_deploy, "write_deploy_log", MagicMock(return_value=True))
yield
@pytest.fixture(autouse=True)
def silence_side_effects(monkeypatch):
for name in (
"notify_stage_change", "notify_qg_failure", "notify_approve_requested",
"send_telegram", "plane_notify_stage", "plane_notify_qg", "plane_add_comment",
"set_issue_in_review", "set_issue_needs_input", "set_issue_in_progress",
"set_issue_blocked", "set_issue_done",
):
monkeypatch.setattr(stage_engine, name, MagicMock())
def _make_task(stage, repo="orchestrator", branch="feature/ORCH-036-x", wi="ORCH-036"):
conn = get_db()
cur = conn.execute(
"INSERT INTO tasks (plane_id, work_item_id, repo, branch, stage) "
"VALUES (?, ?, ?, ?, ?)",
(f"plane-{wi}", wi, repo, branch, stage),
)
task_id = cur.lastrowid
conn.commit()
conn.close()
return task_id
def _stage(task_id):
conn = get_db()
row = conn.execute("SELECT stage FROM tasks WHERE id=?", (task_id,)).fetchone()
conn.close()
return row[0]
def _jobs():
conn = get_db()
rows = conn.execute("SELECT agent FROM jobs ORDER BY id").fetchall()
conn.close()
return [r[0] for r in rows]
def _pass(*a, **k):
return (True, "ok")
def test_tc17_success_deploy_syncs_terminal_done(monkeypatch):
# Hook reported exit 0 -> the host wrapper wrote result=0.
self_deploy.write_marker("orchestrator", "ORCH-036", self_deploy.RESULT, "0")
monkeypatch.setattr(
stage_engine, "QG_CHECKS",
{**stage_engine.QG_CHECKS, "check_deploy_status": _pass},
)
# Spy the merge-lease release to confirm the terminal-sync still frees it.
release = MagicMock()
monkeypatch.setattr(stage_engine.merge_gate, "release_merge_lease", release)
task_id = _make_task("deploy")
stage_engine.run_deploy_finalizer(
{"task_id": task_id, "repo": "orchestrator", "id": 1, "agent": "deploy-finalizer"}
)
assert _stage(task_id) == "done"
assert stage_engine.set_issue_done.called
# The merge-lease is released on the deploy->done terminal-sync.
release.assert_called_once_with("orchestrator", "feature/ORCH-036-x")
# No agent is launched leaving deploy (terminal).
assert _jobs() == []

53
tests/test_qg_checks.py Normal file
View File

@@ -0,0 +1,53 @@
"""ORCH-036 TC-15: the deploy-verdict parse contract is unchanged (AC-10).
``_parse_deploy_status`` reads ONLY the machine-readable ``deploy_status:`` YAML
frontmatter (never prose). ORCH-036 produces the verdict differently (a
deterministic finalizer instead of an LLM), but the parse contract that the gate
relies on must remain bit-identical:
SUCCESS -> (True, ...), FAILED -> (False, ...), no/!frontmatter -> (False, ...).
"""
import os
os.environ.setdefault("ORCH_PLANE_API_TOKEN", "test-token")
os.environ.setdefault("ORCH_GITEA_TOKEN", "test-token")
from src.qg.checks import _parse_deploy_status # noqa: E402
from src.self_deploy import build_deploy_log # noqa: E402
def test_tc15_success_frontmatter_passes():
ok, reason = _parse_deploy_status("---\ndeploy_status: SUCCESS\n---\n\nbody")
assert ok is True
assert "SUCCESS" in reason
def test_tc15_failed_frontmatter_fails():
ok, reason = _parse_deploy_status("---\ndeploy_status: FAILED\n---\n\nbody")
assert ok is False
assert "FAILED" in reason
def test_tc15_no_frontmatter_fails():
ok, _ = _parse_deploy_status("just prose, deploy_status: SUCCESS in text but no frontmatter")
assert ok is False
def test_tc15_missing_field_fails():
ok, _ = _parse_deploy_status("---\nother_field: SUCCESS\n---\n")
assert ok is False
def test_tc15_prose_success_word_does_not_pass():
"""Defensive: the word SUCCESS in prose must NOT satisfy the gate."""
ok, _ = _parse_deploy_status("# Deploy\n\nDeploy was a SUCCESS, hooray!\n")
assert ok is False
def test_tc15_finalizer_log_roundtrips_through_parser():
"""The finalizer's rendered log must be readable by the EXISTING parser —
SUCCESS passes, FAILED fails — proving the producer/consumer contract holds."""
ok_s, _ = _parse_deploy_status(build_deploy_log("ORCH-036", 0, "SUCCESS"))
ok_f, _ = _parse_deploy_status(build_deploy_log("ORCH-036", 2, "FAILED"))
assert ok_s is True
assert ok_f is False

View File

@@ -822,7 +822,12 @@ class TestMergeGate:
def test_tc20_pass_advances_to_deploy(self, monkeypatch):
"""TC-20 / AC-1: gate PASS (rebased + green) -> advance to deploy, deployer
enqueued, NO rollback. staging gate must pass first (same edge)."""
enqueued, NO rollback. staging gate must pass first (same edge).
ORCH-036: disable the manual-approve self-deploy interception so this test
keeps exercising the merge-gate in isolation (the executable self-deploy
Phase A path is covered separately in test_deploy_approve.py)."""
monkeypatch.setattr(stage_engine.settings, "deploy_require_manual_approve", False)
monkeypatch.setattr(
stage_engine, "QG_CHECKS",
{**stage_engine.QG_CHECKS,

41
tests/test_stages.py Normal file
View File

@@ -0,0 +1,41 @@
"""ORCH-036 TC-16: STAGE_TRANSITIONS for deploy are unchanged (AC-10).
ORCH-036 only changes HOW the deploy verdict is produced (a deterministic
finalizer) — it must NOT touch the state machine. The deploy edge keeps its
exact transition (deploy -> done), no in-line agent (None), and the gate
``check_deploy_status``. The deploy-staging edge is likewise untouched.
"""
import os
os.environ.setdefault("ORCH_PLANE_API_TOKEN", "test-token")
os.environ.setdefault("ORCH_GITEA_TOKEN", "test-token")
from src.stages import ( # noqa: E402
STAGE_TRANSITIONS,
get_agent_for_stage,
get_next_stage,
get_qg_for_stage,
)
def test_tc16_deploy_transition_unchanged():
assert STAGE_TRANSITIONS["deploy"] == {
"next": "done", "agent": None, "qg": "check_deploy_status"
}
assert get_next_stage("deploy") == "done"
assert get_agent_for_stage("deploy") is None
assert get_qg_for_stage("deploy") == "check_deploy_status"
def test_tc16_deploy_staging_transition_unchanged():
assert STAGE_TRANSITIONS["deploy-staging"] == {
"next": "deploy", "agent": "deployer", "qg": "check_staging_status"
}
assert get_next_stage("deploy-staging") == "deploy"
assert get_agent_for_stage("deploy-staging") == "deployer"
assert get_qg_for_stage("deploy-staging") == "check_staging_status"
def test_tc16_done_is_terminal():
assert get_next_stage("done") is None

View File

@@ -0,0 +1,99 @@
"""ORCH-036 TC-11: the staging precondition is preserved (AC-8).
A red staging gate (``staging_status: FAILED``) must roll the task back to
development and NEVER let it reach the ``deploy`` stage — so the executable
prod self-deploy can never be initiated off a failed staging run. ORCH-036 adds
its Phase A interception AFTER ``check_staging_status``, so a staging failure
short-circuits before any self-deploy logic runs.
"""
import os
import tempfile
import pytest
_test_db = os.path.join(tempfile.gettempdir(), "test_orch_staging_precond.db")
os.environ["ORCH_DB_PATH"] = _test_db
os.environ["ORCH_REPOS_DIR"] = tempfile.gettempdir()
os.environ.setdefault("ORCH_GITEA_TOKEN", "test-token")
os.environ.setdefault("ORCH_PLANE_API_TOKEN", "test-token")
from unittest.mock import MagicMock # noqa: E402
import src.db as _db # noqa: E402
from src.db import init_db, get_db # noqa: E402
from src import stage_engine # noqa: E402
from src import self_deploy # noqa: E402
from src.stage_engine import advance_stage # noqa: E402
@pytest.fixture(autouse=True)
def fresh_db(monkeypatch, tmp_path):
monkeypatch.setattr(_db.settings, "db_path", _test_db)
if os.path.exists(_test_db):
os.unlink(_test_db)
init_db()
monkeypatch.setattr(self_deploy.settings, "repos_dir", str(tmp_path))
monkeypatch.setattr(self_deploy.settings, "host_repos_dir", str(tmp_path))
yield
@pytest.fixture(autouse=True)
def silence_side_effects(monkeypatch):
for name in (
"notify_stage_change", "notify_qg_failure", "notify_approve_requested",
"send_telegram", "plane_notify_stage", "plane_notify_qg", "plane_add_comment",
"set_issue_in_review", "set_issue_needs_input", "set_issue_in_progress",
"set_issue_blocked", "set_issue_done",
):
monkeypatch.setattr(stage_engine, name, MagicMock())
def _make_task(stage, repo="orchestrator", branch="feature/ORCH-036-x", wi="ORCH-036"):
conn = get_db()
cur = conn.execute(
"INSERT INTO tasks (plane_id, work_item_id, repo, branch, stage) "
"VALUES (?, ?, ?, ?, ?)",
(f"plane-{wi}", wi, repo, branch, stage),
)
task_id = cur.lastrowid
conn.commit()
conn.close()
return task_id
def _stage(task_id):
conn = get_db()
row = conn.execute("SELECT stage FROM tasks WHERE id=?", (task_id,)).fetchone()
conn.close()
return row[0]
def _fail(reason):
def _f(*a, **k):
return (False, reason)
return _f
def test_tc11_staging_failed_never_reaches_deploy(monkeypatch):
monkeypatch.setattr(stage_engine.settings, "deploy_require_manual_approve", True)
monkeypatch.setattr(
stage_engine, "QG_CHECKS",
{**stage_engine.QG_CHECKS,
"check_staging_status": _fail("Staging status: FAILED")},
)
# Guard: a failed staging run must not trigger any self-deploy logic.
initiate = MagicMock()
monkeypatch.setattr(stage_engine.self_deploy, "initiate_deploy", initiate)
task_id = _make_task("deploy-staging")
res = advance_stage(
task_id, "deploy-staging", "orchestrator", "ORCH-036",
"feature/ORCH-036-x", finished_agent="deployer",
)
assert res.advanced is False
assert res.rolled_back_to == "development"
assert _stage(task_id) == "development" # NEVER reached deploy
initiate.assert_not_called()
assert not self_deploy.has_marker("orchestrator", "ORCH-036", self_deploy.APPROVE_REQUESTED)