"""Executable self-deploy primitives (ORCH-036). The ``deploy`` stage for the self-hosting ``orchestrator`` repo is a REAL prod restart, not a paper LLM verdict. Because the prod container (8500) runs the worker/agent itself, the restart must be performed by an EXTERNAL host process that survives the container dying (BR-2). The orchestration is split into three deterministic phases (ADR-001), wired in ``stage_engine``: * Phase A — request approve on the ``deploy-staging -> deploy`` edge. * Phase B — a human Plane ``Approved`` initiates the detached host deploy. * Phase C — a deterministic finalizer maps the hook exit-code -> deploy_status. This module is a **leaf**: it imports only config / git_worktree (and lazily ``qg.checks.is_self_hosting_repo``), never ``stage_engine`` / ``launcher`` — the orchestration that needs those lives in ``stage_engine``. Every public helper honours a **never-raise** contract so a deploy-state hiccup can never crash the stage engine. Restart-safe state lives in sentinel files under ``/.deploy-state-//`` (mirrors the merge-lease pattern, ТЗ §4 — no DB migration), on the shared mount visible to BOTH the container (reads markers) and the host (writes ``result``): * ``approve-requested`` — Phase A done; * ``initiated`` — Phase B started (idempotency-guard); * ``result`` — the hook exit-code, written by the host WRAPPER (``echo $? > result``), NOT by the hook itself. """ import logging import os import shlex import shutil import subprocess from .config import settings logger = logging.getLogger("orchestrator.self_deploy") # Sentinel marker filenames (see module docstring). APPROVE_REQUESTED = "approve-requested" INITIATED = "initiated" RESULT = "result" # ssh launch is detached (returns immediately); keep a bounded timeout so a hung # ssh handshake never wedges the caller. _SSH_TIMEOUT = 30 _GIT_TIMEOUT = 60 # --------------------------------------------------------------------------- # Conditionality # --------------------------------------------------------------------------- def self_deploy_applies(repo: str) -> bool: """Whether executable self-deploy (Phase A/B/C) is REAL for this repo. Mirrors the ORCH-35 / ORCH-43 conditional rollout: * ``self_deploy_enabled=False`` -> always False (global kill-switch); the legacy synchronous deployer path runs for everyone. * ``self_deploy_repos`` (CSV) non-empty -> real only for listed repos. * empty CSV -> real ONLY for the self-hosting repo (``orchestrator``). Never raises. """ try: if not settings.self_deploy_enabled: return False raw = (settings.self_deploy_repos or "").strip() if raw: allowed = {r.strip().lower() for r in raw.split(",") if r.strip()} return (repo or "").strip().lower() in allowed # Lazy import keeps this module a leaf (avoids importing qg at module load). from .qg.checks import is_self_hosting_repo return is_self_hosting_repo(repo) except Exception as e: # noqa: BLE001 - never-raise contract logger.warning("self_deploy_applies error for %s: %s", repo, e) return False # --------------------------------------------------------------------------- # exit-code -> deploy_status mapping (pure, unit-tested: TC-01/02/03) # --------------------------------------------------------------------------- def map_exit_code_to_status(exit_code) -> str: """Map a deploy-hook exit-code to a machine verdict (deterministic, pure). Contract (AC-1 / AC-3, hook exit-code contract 0/1/2): * ``0`` -> ``"SUCCESS"`` (health-ok proven by the hook). * ``1`` (rolled back), ``2`` (rollback also failed), anything else, or a non-int/None -> ``"FAILED"`` (fail-closed; never advances on doubt). """ try: code = int(exit_code) except (TypeError, ValueError): return "FAILED" return "SUCCESS" if code == 0 else "FAILED" def build_deploy_log(work_item_id: str, exit_code, status: str) -> str: """Render a 14-deploy-log.md body whose ``deploy_status:`` frontmatter is the verdict ``check_deploy_status`` / ``_parse_deploy_status`` reads (contract unchanged, AC-10). The body is informational only — only the frontmatter is machine-read. """ return ( "---\n" f"deploy_status: {status}\n" f"work_item: {work_item_id}\n" f"hook_exit_code: {exit_code}\n" "deployed_by: deploy-finalizer\n" "---\n\n" "# Deploy log — ORCH-036 executable self-deploy\n\n" f"Прод-деплой завершён хост-хуком с exit-code `{exit_code}` -> " f"`deploy_status: {status}`.\n\n" "Вердикт зафиксирован детерминированным finalizer'ом (Фаза C), не LLM.\n" ) # --------------------------------------------------------------------------- # Sentinel state (restart-safe, no DB migration — ТЗ §4) # --------------------------------------------------------------------------- def _state_dir(base: str, repo: str, work_item_id: str | None) -> str: return os.path.join(base, f".deploy-state-{repo}", (work_item_id or "_")) def container_state_dir(repo: str, work_item_id: str | None) -> str: """State dir as seen FROM THE CONTAINER (settings.repos_dir mount).""" return _state_dir(settings.repos_dir, repo, work_item_id) def host_state_dir(repo: str, work_item_id: str | None) -> str: """State dir as seen FROM THE HOST (settings.host_repos_dir). Same physical directory as ``container_state_dir`` via the shared mount; the host path is what we embed in the ssh command so the host wrapper writes the ``result`` sentinel where the container can read it. """ return _state_dir(settings.host_repos_dir, repo, work_item_id) def marker_path(repo: str, work_item_id: str | None, name: str) -> str: return os.path.join(container_state_dir(repo, work_item_id), name) def has_marker(repo: str, work_item_id: str | None, name: str) -> bool: """True iff the named sentinel exists. Never raises.""" try: return os.path.isfile(marker_path(repo, work_item_id, name)) except Exception as e: # noqa: BLE001 - never-raise logger.warning("has_marker error for %s/%s/%s: %s", repo, work_item_id, name, e) return False def write_marker(repo: str, work_item_id: str | None, name: str, content: str = "") -> bool: """Create/overwrite a sentinel (best-effort). Returns True on success.""" try: d = container_state_dir(repo, work_item_id) os.makedirs(d, exist_ok=True) with open(os.path.join(d, name), "w", encoding="utf-8") as f: f.write(str(content)) return True except OSError as e: logger.warning("write_marker error for %s/%s/%s: %s", repo, work_item_id, name, e) return False def clear_state(repo: str, work_item_id: str | None) -> bool: """Remove ALL deploy-state sentinels for this work item (best-effort). Sentinels are keyed by ``work_item_id`` (stable for the whole task lifetime), so a FAILED prod-deploy leaves ``approve-requested`` / ``initiated`` / ``result`` behind. Without cleanup, after the БАГ-8 rollback (deploy -> development) and a fix, the task reaching ``deploy`` again would hit Phase B's idempotency-guard: the STALE ``initiated`` makes it a no-op, the detached hook never re-launches and the task wedges on ``deploy`` forever (re-deploy-after-rollback contract broken; AC-4/AC-10). A stale ``result`` would likewise be mis-read by the new finalizer. Clearing the whole state dir restores a clean slate for the next pass. Idempotent (a missing dir is success). Never raises. """ d = container_state_dir(repo, work_item_id) try: shutil.rmtree(d) logger.info("clear_state: removed deploy-state dir %s", d) return True except FileNotFoundError: return True except OSError as e: # noqa: BLE001 - never-raise contract logger.warning("clear_state error for %s/%s: %s", repo, work_item_id, e) return False def read_result(repo: str, work_item_id: str | None) -> tuple[bool, int | None]: """Read the ``result`` sentinel (hook exit-code written by the host wrapper). Returns ``(present, exit_code)``: * ``(False, None)`` -> not written yet (finalizer should DEFER); * ``(True, )`` -> verdict ready; * ``(True, 1)`` -> present but corrupt/unparseable -> treated as a failure code (fail-closed) so we never advance on garbage. Never raises. """ p = marker_path(repo, work_item_id, RESULT) try: with open(p, "r", encoding="utf-8") as f: raw = f.read().strip() except FileNotFoundError: return False, None except OSError as e: logger.warning("read_result error for %s/%s: %s", repo, work_item_id, e) return False, None if raw == "": return False, None try: return True, int(raw) except ValueError: logger.warning("read_result: corrupt result %r for %s/%s", raw, repo, work_item_id) return True, 1 # --------------------------------------------------------------------------- # Detached host deploy: ssh + setsid (Phase B) # --------------------------------------------------------------------------- def build_deploy_command(repo: str, work_item_id: str | None, branch: str) -> list[str]: """Build the ssh argv that launches the DETACHED prod deploy on the host. The remote command runs the hook via ``setsid`` with stdin/stdout detached and backgrounded (``&``) so the process SURVIVES the prod container restart (BR-2), then the WRAPPER (not the hook) writes the exit-code to the ``result`` sentinel: setsid bash -c 'cd && bash --deploy; \ echo $? > ' >> 2>&1 `` makes the hook retag the staging-validated image to the prod tag instead of rebuilding (no ``docker build``). The exit-code contract of the hook is untouched. Provenance guard (ORCH-058, Strategy B): when the image-freshness feature is active for this repo, the VALIDATED commit SHA is passed as ``EXPECTED_REVISION=`` so the hook fail-closes (``exit 1``) before ``docker tag`` if SOURCE_IMAGE's revision label does not match — a stale image can never be silently promoted. When inactive (non-self / kill-switch off) ``expected_revision`` returns ``""`` and the env is omitted, keeping the hook's backward-compatible "no provenance check" behaviour (AC-5 / AC-7). """ from . import image_freshness host_dir = host_state_dir(repo, work_item_id) result_sentinel = os.path.join(host_dir, RESULT) hook_log = os.path.join(host_dir, "hook.log") env_assignments = ( f"SOURCE_IMAGE={shlex.quote(settings.deploy_prod_source_image)} " f"TARGET_SERVICE={shlex.quote(settings.deploy_prod_target_service)} " f"TARGET_PORT={int(settings.deploy_prod_target_port)} " f"TARGET_IMAGE={shlex.quote(settings.deploy_prod_target_image)} " f"COMPOSE_PROFILE={shlex.quote(settings.deploy_prod_compose_profile)} " f"PREV_IMAGE_FILE={shlex.quote(settings.deploy_prod_prev_image_file)}" ) expected_rev = image_freshness.expected_revision(repo, branch) if expected_rev: env_assignments += f" EXPECTED_REVISION={shlex.quote(expected_rev)}" inner = ( f"cd {shlex.quote(settings.deploy_host_repo_path)} && " f"{env_assignments} " f"bash {shlex.quote(settings.deploy_hook_script)} --deploy; " f"echo $? > {shlex.quote(result_sentinel)}" ) remote = ( f"setsid bash -c {shlex.quote(inner)} " f">> {shlex.quote(hook_log)} 2>&1 tuple[bool, str]: """Launch the detached prod deploy on the host (Phase B). Never raises. The ssh call returns immediately (the remote process is detached via setsid + ``&``). Returns ``(True, msg)`` when ssh dispatched the detached process, or ``(False, reason)`` so the caller can alert and let the human re-approve. """ # Ensure the shared state dir exists so the host wrapper can write `result`. try: os.makedirs(container_state_dir(repo, work_item_id), exist_ok=True) except OSError as e: logger.warning("initiate_deploy: state dir error for %s/%s: %s", repo, work_item_id, e) cmd = build_deploy_command(repo, work_item_id, branch) try: r = subprocess.run(cmd, capture_output=True, text=True, timeout=_SSH_TIMEOUT) except subprocess.TimeoutExpired: return False, "ssh launch timeout" except (subprocess.SubprocessError, OSError) as e: return False, f"ssh launch error: {e}" if r.returncode != 0: detail = ((r.stderr or "") + (r.stdout or "")).strip()[:200] return False, f"ssh launch failed (rc={r.returncode}): {detail}" logger.info("initiate_deploy: detached prod deploy dispatched for %s/%s", repo, work_item_id) return True, "deploy initiated (detached host process)" # --------------------------------------------------------------------------- # Deploy log write + best-effort merge (Phase C) # --------------------------------------------------------------------------- def write_deploy_log(repo: str, work_item_id: str, branch: str, exit_code, status: str) -> bool: """Write 14-deploy-log.md into the task worktree (so check_deploy_status reads it) and best-effort commit+push it. Returns True iff the file was written. Never raises. """ from .git_worktree import get_worktree_path rel = f"docs/work-items/{work_item_id}/14-deploy-log.md" try: wt = get_worktree_path(repo, branch) except Exception as e: # noqa: BLE001 - never-raise logger.error("write_deploy_log: worktree error for %s/%s: %s", repo, branch, e) return False path = os.path.join(wt, rel) content = build_deploy_log(work_item_id, exit_code, status) try: os.makedirs(os.path.dirname(path), exist_ok=True) with open(path, "w", encoding="utf-8") as f: f.write(content) except OSError as e: logger.error("write_deploy_log: write error at %s: %s", path, e) return False # Best-effort commit + push (the gate also falls back to origin/main). git_env = { **os.environ, "HOME": "/home/slin", "GIT_AUTHOR_NAME": "deploy-finalizer", "GIT_AUTHOR_EMAIL": "deploy-finalizer@mva154.local", "GIT_COMMITTER_NAME": "deploy-finalizer", "GIT_COMMITTER_EMAIL": "deploy-finalizer@mva154.local", } try: subprocess.run(["git", "-C", wt, "add", rel], capture_output=True, timeout=_GIT_TIMEOUT, env=git_env) commit = subprocess.run( ["git", "-C", wt, "commit", "-m", f"deploy(ORCH-036): finalize {status} for {work_item_id}"], capture_output=True, text=True, timeout=_GIT_TIMEOUT, env=git_env, ) if commit.returncode == 0: subprocess.run(["git", "-C", wt, "push", "origin", branch], capture_output=True, timeout=_GIT_TIMEOUT, env=git_env) except (subprocess.SubprocessError, OSError) as e: logger.warning("write_deploy_log: git commit/push best-effort failed: %s", e) return True