#!/usr/bin/env python3 """setup_lite.py — интерактивный installer Lite-тиража (ORCH-104). Один операторский CLI, автоматизирующий маршрут docs/deployment/LITE_SETUP.md §2–§12 для внешнего оператора/заказчика: скан предусловий хоста с офером доустановки → discovery docker-инсталляций Plane/Gitea → интерактивный сбор обязательных ключей с немедленной верификацией → автодетект хост-параметров и когерентность портов → сборка .env/.env.watchdog от канонов → webhook Plane → guard-ы Gitea → подъём ровно орк+watchdog → регистрация проекта строго кирпичом onboard_project.py → итоговый отчёт PASS/FAIL/MANUAL. Режимы (ADR-001 D2, семейная лексика ORCH-009/103): plan — строгий read-only: скан предусловий + discovery + автодетект + план шагов; ноль мутаций ФС/docker/сети. exit 0 (блокеров нет) / 2 (есть). apply — ДЕФОЛТ-wizard (бизнес-цель «одна команда»). Безопасность дефолта — структурно, не режимом: фаза 0 ≡ plan (read-only), ранний guard чужого .env (маркер managed-файла), per-action consent на каждую мутацию, non-TTY без --yes → exit 2 ДО любой мутации. verify — read-only пост-проверка (/health + /queue + /metrics, состав «ровно орк+watchdog», stateless-чистота §12). Exit-коды (контракт FR-1): 0 — все шаги PASS; 2 — остановка на manual-step / незавершённое предусловие; 1 — ошибка. Гарантии (NFR-1/3, ADR-001): * python stdlib-only; модули платформы (src.*) не импортируются — канон-знания (секреты, статусы/лейблы/репо/вебхуки) идут ТОЛЬКО субпроцессами кирпичей scripts/gen_secrets.py и scripts/onboard_project.py; * значения секретов НИКОГДА не печатаются (скрытый ввод, только имена ключей); * delete-операций НЕТ ВООБЩЕ: лечение — всегда инструкция (no-delete); * существующий немаркированный .env/.env.watchdog не перетирается без --force; * никаких операций с веткой main / force-push; рестарт — только собственного свежеподнятого контура (никогда чужие/боевые контейнеры). Канон маршрута — docs/deployment/LITE_SETUP.md (скрипт = рекомендованный быстрый путь §1.1; ручной маршрут §2–§13 сохранён как fallback для MANUAL-шагов). Запуск — из корня чекаута репо orchestrator на целевом хосте заказчика: python3 scripts/setup_lite.py # интерактивная установка (apply) python3 scripts/setup_lite.py plan # read-only диагностика python3 scripts/setup_lite.py verify # read-only пост-проверка """ import argparse import getpass import json import os import re import shutil import socket import subprocess import sys import tempfile import urllib.error import urllib.request import uuid from dataclasses import dataclass, field REPO_ROOT = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) ROOT_ENV_EXAMPLE = os.path.join(REPO_ROOT, ".env.example") ROOT_ENV = os.path.join(REPO_ROOT, ".env") WATCHDOG_ENV_EXAMPLE = os.path.join(REPO_ROOT, ".env.watchdog.example") WATCHDOG_ENV = os.path.join(REPO_ROOT, ".env.watchdog") GEN_SECRETS = os.path.join(REPO_ROOT, "scripts", "gen_secrets.py") ONBOARD = os.path.join(REPO_ROOT, "scripts", "onboard_project.py") REQUIREMENTS = os.path.join(REPO_ROOT, "requirements.txt") VENV_DIR = os.path.join(REPO_ROOT, ".venv") VENV_PY = os.path.join(VENV_DIR, "bin", "python") # Канон маршрута — каждый шаг ссылается на соответствующий § этого дока (NFR-4). DOC = "docs/deployment/LITE_SETUP.md" # Маркер managed-файла — ключ к resume и к guard'у чужого .env (ADR-001 D6). MANAGED_MARKER = "# managed by scripts/setup_lite.py (ORCH-104)" EXIT_OK = 0 EXIT_MANUAL = 2 EXIT_ERROR = 1 # Дефолтные порты платформы (LITE_SETUP §2.5); busy-check предлагает альтернативу. DEFAULT_PROD_PORT = 8500 DEFAULT_STAGING_PORT = 8501 # Состав базового Lite-контура (FR-8): ровно эти два сервиса по дефолту. LITE_SERVICES = ("orchestrator", "orchestrator-watchdog") # Discovery (ADR-001 D5): опознание инсталляций СТРОГО по префиксам образов, # имена контейнеров/проектов как признак НЕ используются (анти-ложноположительность). PLANE_IMAGE_NEEDLES = ("makeplane/",) GITEA_IMAGE_NEEDLES = ("gitea/gitea", "docker.gitea.com/gitea") POSTGRES_IMAGE_NEEDLES = ("postgres",) # Закрытый набор пакетных менеджеров (ADR-001 D4), детект по наличию бинаря. PACKAGE_MANAGERS = ("apt-get", "dnf", "yum", "zypper") _INSTALL_TEMPLATES = { "apt-get": "sudo apt-get update && sudo apt-get install -y {pkg}", "dnf": "sudo dnf install -y {pkg}", "yum": "sudo yum install -y {pkg}", "zypper": "sudo zypper install -y {pkg}", } # Имена пакетов per-менеджер (карта-константа, ADR-001 D4); "*" — generic. _PACKAGE_NAMES = { "docker": { "apt-get": "docker.io docker-compose-plugin", "dnf": "docker docker-compose-plugin", "yum": "docker docker-compose-plugin", "zypper": "docker docker-compose", }, "git": {"*": "git"}, "python3": {"*": "python3"}, "node": { "apt-get": "nodejs npm", "dnf": "nodejs npm", "yum": "nodejs npm", "zypper": "nodejs npm", }, } # Карта обязательных ключей нового хоста (§4.2 LITE_SETUP); подмножество канона # .env.example — держит анти-дрейф тест (TC-13d). Webhook-секреты — отдельно. MANDATORY_NEW_HOST_KEYS = ( # Plane "ORCH_PLANE_API_URL", "ORCH_PLANE_WEB_URL", "ORCH_PLANE_WORKSPACE_SLUG", "ORCH_PLANE_API_TOKEN", # Gitea "ORCH_GITEA_URL", "ORCH_GITEA_PUBLIC_URL", "ORCH_GITEA_OWNER", "ORCH_GITEA_TOKEN", # webhook-секреты (кирпич gen_secrets.py) "ORCH_PLANE_WEBHOOK_SECRET", "ORCH_GITEA_WEBHOOK_SECRET", # Telegram "ORCH_TELEGRAM_BOT_TOKEN", "ORCH_TELEGRAM_CHAT_ID", # реестр проектов "ORCH_PROJECTS_JSON", # хост-параметры "ORCH_AGENT_HOME_DIR", "ORCH_HOST_REPOS_DIR", "ORCH_HOST_CLAUDE_DIR", "ORCH_HOST_CLAUDE_JSON", "ORCH_HOST_SSH_DIR", "ORCH_HOST_CLAUDE_CODE_DIR", "ORCH_HOST_NODE_BIN", "ORCH_RUN_UID", "ORCH_RUN_GID", "ORCH_DOCKER_GID", "ORCH_DEPLOY_HOST_REPO_PATH", # порты (когерентная тройка + staging) "ORCH_DEPLOY_PROD_TARGET_PORT", "WATCHDOG_METRICS_URL", "ORCH_POST_DEPLOY_BASE_URL", "ORCH_STAGING_PORT", ) # Webhook-секреты — выпускает ТОЛЬКО кирпич gen_secrets.py (канон-знание). WEBHOOK_SECRET_KEYS = ("ORCH_PLANE_WEBHOOK_SECRET", "ORCH_GITEA_WEBHOOK_SECRET") _SLUG_RE = re.compile(r"^[a-z0-9][a-z0-9-]*$") _PUBLISHED_PORT_RE = re.compile(r"(?:0\.0\.0\.0|127\.0\.0\.1|\[::\]|):(\d+)->") _WORK_ITEM_RE = re.compile(r"\b([A-Z][A-Z0-9]*)-(\d+)\b") class ManualStop(Exception): """Остановка на manual-step / незавершённом предусловии → exit 2.""" class SetupError(Exception): """Невосстановимая ошибка шага → exit 1.""" # --------------------------------------------------------------------------- # # Инжектируемый I/O (ADR-001 D10): unit-тестируемость без реального TTY. # --------------------------------------------------------------------------- # @dataclass class IO: """Источники ввода/вывода как параметры — wizard тестируется скриптованными ответами без TTY (NFR-5). Значение секрета НИКОГДА не пишется в ``say_fn``.""" input_fn: object # callable(prompt) -> str (видимый ввод) getpass_fn: object # callable(prompt) -> str (скрытый ввод) say_fn: object # callable(str) -> None is_tty: bool = True env: dict = field(default_factory=dict) yes: bool = False def say(self, msg: str) -> None: """Печать строки прогресса (значения секретов сюда НЕ передаются, NFR-3).""" self.say_fn(msg) def ask(self, key, prompt, secret=False, verify=None, max_tries=3, default=None): """Запрос значения ключа ``key`` с опциональной НЕМЕДЛЕННОЙ верификацией ``verify(value) -> (ok, hint)`` (FR-4). Порядок источника значения: env-prefill (то же каноническое имя ключа, D10) → default в non-TTY → интерактивный ввод. Неуспех verify → re-prompt с диагнозом, лимит ``max_tries`` → ManualStop (не бесконечный цикл). Без TTY и без значения — честный ManualStop (никаких зависаний). Секрет идёт через ``getpass_fn`` и НЕ печатается.""" last_hint = "" for attempt in range(1, max_tries + 1): value = None if attempt == 1: env_val = (self.env.get(key) or "").strip() if env_val: value = env_val if value is None: if not self.is_tty: if default is not None: value = default else: raise ManualStop( f"{key}: нет TTY и нет значения (env-prefill {key} + --yes)" ) else: reader = self.getpass_fn if secret else self.input_fn label = key if secret else (f"{key} [{default}]" if default else key) raw = (reader(f" {prompt} ({label}): ") or "").strip() value = raw or (default if default is not None else "") if verify is None: return value ok, hint = verify(value) if ok: return value last_hint = hint self.say(f" ✗ {key}: {hint}") if not self.is_tty: break raise ManualStop(f"{key}: не прошло верификацию ({last_hint})") def consent(self, action) -> bool: """Per-action consent (FR-1). ``--yes`` = заранее данное согласие (headless). non-TTY без ``--yes`` → ManualStop (честный отказ, не зависание). В TTY читает y/N.""" if self.yes: return True if not self.is_tty: raise ManualStop(f"consent «{action}» требует TTY или --yes (D10)") answer = (self.input_fn(f" выполнить «{action}»? [y/N]: ") or "").strip().lower() return answer in ("y", "yes", "д", "да") def _real_io(args: argparse.Namespace) -> IO: return IO( input_fn=input, getpass_fn=getpass.getpass, say_fn=lambda s: print(s, flush=True), is_tty=sys.stdin.isatty(), env=dict(os.environ), yes=bool(getattr(args, "yes", False)), ) # --------------------------------------------------------------------------- # # Чистые функции (unit-тесты — tests/test_setup_lite_script.py) # --------------------------------------------------------------------------- # def parse_env(text: str) -> dict: """``KEY=value``-строки текста → словарь (комментарии/пустые — мимо).""" out: dict = {} for line in (text or "").splitlines(): line = line.strip() if not line or line.startswith("#") or "=" not in line: continue key, value = line.split("=", 1) out[key.strip()] = value.strip() return out def render_env(example_text: str, overrides: dict, marker: str = MANAGED_MARKER) -> str: """Рендер env-файла от канона-example (ORCH-101: дефолт = боевое значение — записываются только собранные отличия). Первая строка — фиксированный ``marker`` managed-файла (ADR-001 D6: основа resume и guard'а).""" used: set = set() lines: list = [marker] for line in example_text.splitlines(): stripped = line.strip() if stripped and not stripped.startswith("#") and "=" in stripped: key = stripped.split("=", 1)[0].strip() if key in overrides: lines.append(f"{key}={overrides[key]}") used.add(key) continue lines.append(line) extra = [k for k in overrides if k not in used] if extra: lines.append("") lines.append("# --- setup_lite.py (ORCH-104): дозаполненные ключи ---") for key in extra: lines.append(f"{key}={overrides[key]}") return "\n".join(lines) + "\n" def _rerender_existing(text: str, values: dict) -> str: """Перерендер существующего managed-файла: каждая строка сохраняется (включая маркер и комментарии), ``KEY=`` строки получают значения ``values``, недостающие ключи дописываются. Маркер НЕ дублируется (текст уже им начинается).""" used: set = set() out: list = [] for line in text.splitlines(): stripped = line.strip() if stripped and not stripped.startswith("#") and "=" in stripped: key = stripped.split("=", 1)[0].strip() if key in values: out.append(f"{key}={values[key]}") used.add(key) continue out.append(line) extra = [k for k in values if k not in used] if extra: out.append("") for key in extra: out.append(f"{key}={values[key]}") return "\n".join(out) + "\n" def env_file_state(text) -> str: """Состояние env-файла по содержимому (ADR-001 D6): ``"absent"`` (None) | ``"managed"`` (первая строка — наш маркер) | ``"foreign"`` (чужой/живой).""" if text is None: return "absent" first = text.splitlines()[0].strip() if text.strip() else "" return "managed" if first == MANAGED_MARKER else "foreign" def port_overrides(prod_port) -> dict: """Когерентная тройка прод-порта одной функцией (ADR-001 D7): рассинхрон структурно невозможен (ловушка §2.5/§4.2 закрывается кодом, не дисциплиной).""" p = str(prod_port) return { "ORCH_DEPLOY_PROD_TARGET_PORT": p, "WATCHDOG_METRICS_URL": f"http://127.0.0.1:{p}/metrics", "ORCH_POST_DEPLOY_BASE_URL": f"http://localhost:{p}", } def next_free_port(start, busy=None) -> int: """Первый свободный порт начиная со ``start`` (busy-check инжектируется).""" is_busy = busy or _port_busy port = int(start) while is_busy(port): port += 1 return port def staging_port_ok(staging_port, prod_port) -> bool: """``ORCH_STAGING_PORT == прод-порт`` → fail-closed (инвариант ORCH-058/101).""" return str(staging_port) != str(prod_port) def split_overrides(answers: dict) -> tuple: """Раскладка собранных значений по файлам-носителям (§4.3): ключи ``WATCHDOG_*`` — ТОЛЬКО в ``.env.watchdog`` (в ``.env`` для sidecar инертны), остальные — в корневой ``.env``. Возвращает ``(root_overrides, watchdog_overrides)``.""" root: dict = {} watchdog: dict = {} for key, value in answers.items(): if key.startswith("WATCHDOG_"): watchdog[key] = value else: root[key] = value return root, watchdog def prompt_defaults(facts: dict) -> dict: """Подсказки-дефолты промптов — ТОЛЬКО из автодетекта/канона (FR-5/TC-15): ни одного боевого литерала исходного хоста (stateless).""" home = facts.get("home", "") or "" return { "ORCH_RUN_UID": str(facts.get("uid", "")), "ORCH_RUN_GID": str(facts.get("gid", "")), "ORCH_DOCKER_GID": str(facts.get("docker_gid", "")), "ORCH_HOST_NODE_BIN": facts.get("node_bin", ""), "ORCH_HOST_CLAUDE_CODE_DIR": facts.get("claude_code_dir", ""), "ORCH_AGENT_HOME_DIR": home, "ORCH_HOST_CLAUDE_DIR": os.path.join(home, ".claude") if home else "", "ORCH_HOST_CLAUDE_JSON": os.path.join(home, ".claude.json") if home else "", "ORCH_HOST_REPOS_DIR": facts.get("repos_dir", ""), "ORCH_HOST_SSH_DIR": facts.get("ssh_dir", ""), "ORCH_DEPLOY_HOST_REPO_PATH": facts.get("repo_root", ""), } def detect_pkg_manager(which=None) -> str | None: """Детект пакетного менеджера по наличию бинаря (закрытый набор, в порядке PACKAGE_MANAGERS). Неопределимый (pacman/alpine и пр.) → None → MANUAL.""" which = which or shutil.which for manager in PACKAGE_MANAGERS: if which(manager): return manager return None def prereq_install_item(label: str) -> str: """Логический пункт установки для пункта предусловий (ADR-001 D4). docker и compose ставятся одним пакетным набором; claude-code — отдельно.""" low = (label or "").lower() if "claude" in low: return "claude-code" if "docker" in low and "group" not in low: return "docker" if "compose" in low: return "docker" if "node" in low: return "node" if "git" in low: return "git" if "python3" in low: return "python3" return label def install_command(manager: str | None, item: str) -> str | None: """Точная команда установки пункта ``item`` под детектированный ``manager``. Спец-случаи (ADR-001 D4): claude-code — npm; ssh-key — ssh-keygen. Неизвестный менеджер/пакет → None (вызывающий выдаёт MANUAL с generic-инструкцией).""" if item == "claude-code": return "npm install -g @anthropic-ai/claude-code" if item == "ssh-key": return 'ssh-keygen -t ed25519 -f /id_ed25519 -N ""' if manager is None: return None per_item = _PACKAGE_NAMES.get(item, {}) pkg = per_item.get(manager) or per_item.get("*") if not pkg: return None return _INSTALL_TEMPLATES[manager].format(pkg=pkg) def manual_install_hint(item: str) -> str: """MANUAL-инструкция при неопределимом менеджере/пакете (со ссылкой на канон — не молчаливый пропуск, не падение).""" return (f"{item}: пакетный менеджер не определён — установите вручную " f"средствами вашего дистрибутива (канон — {DOC} §2)") def offer_install(item: str, command: str, io: IO, runner, recheck=None) -> str: """Офер установки пункта (ADR-001 D4): печать ТОЧНОЙ команды ДО исполнения → per-package consent → исполнение → re-check фактом. Отказ → MANUAL (мутация не выполнена). re-check не сошёлся → честный MANUAL (не ложный OK).""" io.say(f" {item}: предлагаю установить командой:") io.say(f" {command}") if not io.consent(f"установить {item}"): io.say(f" 🖐 {item}: MANUAL (отказ) — выполните вручную: {command}") return "manual" proc = runner(command) rc = getattr(proc, "returncode", 0) if proc is not None else 0 if rc != 0: io.say(f" ✗ {item}: установка не удалась — выполните вручную: {command}") return "manual" if recheck is not None and not recheck(): io.say(f" 🖐 {item}: установлено, но re-check не подтвердил — проверьте вручную") return "manual" io.say(f" ✓ {item}: установлено") return "ok" def prereq_verdicts(facts: dict) -> list: """Вердикты предусловий хоста (FR-2) от read-only снимка фактов → ``[(item, OK|MISSING|WARN|MANUAL, detail)]``. Ни один пункт не пропускается молча (AC-2). Не-Linux/не-x86_64 → WARN «вне контура Lite» (не FAIL).""" verdicts: list = [] uname = facts.get("uname", "") or "" if "Linux" in uname and "x86_64" in uname: verdicts.append(("os", "OK", uname)) else: verdicts.append(("os", "WARN", f"{uname or '?'} — вне контура Lite (поддержан Linux x86_64)")) verdicts.append(("docker", "OK" if facts.get("docker") else "MISSING", "")) verdicts.append(("compose", "OK" if facts.get("compose_v2") else "MISSING", "")) verdicts.append(("git", "OK" if facts.get("git") else "MISSING", "")) verdicts.append(("python3", "OK" if facts.get("python3") else "MISSING", "")) verdicts.append(("node", "OK" if facts.get("node") else "MISSING", "")) verdicts.append(("claude-code", "OK" if facts.get("claude_code_dir") else "MISSING", "")) auth_ok = bool(facts.get("claude_creds_readable")) verdicts.append(("claude-auth", "OK" if auth_ok else "MANUAL", "" if auth_ok else "первичный логин claude CLI (§7.2)")) verdicts.append(("docker-group", "OK" if facts.get("docker_gid") else "MISSING", "")) repos_ok = bool(facts.get("repos_dir_owner_ok")) verdicts.append(("repos-dir", "OK" if repos_ok else "WARN", "" if repos_ok else "владелец каталога ≠ ORCH_RUN_UID:ORCH_RUN_GID (§2.2)")) ssh_ok = bool(facts.get("ssh_dir") and facts.get("ssh_keys")) verdicts.append(("ssh", "OK" if ssh_ok else "MANUAL", "" if ssh_ok else "ssh-keygen + pubkey в Gitea (§2.4)")) busy = facts.get("busy_ports") or [] verdicts.append(("ports", "OK" if not busy else "WARN", "" if not busy else f"заняты: {busy} — выберите другие (§2.5)")) return verdicts def has_blockers(verdicts) -> bool: """Есть ли блокеры (MISSING) среди вердиктов предусловий (устранимы оферами).""" return any(status == "MISSING" for _, status, _ in verdicts) def _img_matches(image, needles) -> bool: img = (image or "").strip() return any(needle in img for needle in needles) def _published_ports(ports_field: str) -> list: """Published host-порты из поля ``docker ps`` ``{{.Ports}}`` (best-effort).""" return sorted({int(m.group(1)) for m in _PUBLISHED_PORT_RE.finditer(ports_field or "")}) def _installation(project: str, members: list, kind: str) -> dict: """Сборка «инсталляции» из контейнеров одного compose-проекта (ADR-001 D5).""" images = sorted({m.get("image", "") for m in members}) ports: list = [] if kind == "plane": for m in members: if "plane-proxy" in (m.get("image") or ""): ports = _published_ports(m.get("ports", "")) if ports: break if not ports: for m in members: ports = _published_ports(m.get("ports", "")) if ports: break else: # gitea for m in members: if _img_matches(m.get("image"), GITEA_IMAGE_NEEDLES): ports = _published_ports(m.get("ports", "")) if ports: break postgres = sorted({m.get("name", "") for m in members if _img_matches(m.get("image"), POSTGRES_IMAGE_NEEDLES)}) return { "kind": kind, "project": project, "images": images, "url_port": ports[0] if ports else None, "postgres_candidates": postgres, } def discover_installations(containers) -> list: """Классификатор discovery (FR-3): перечень docker-контейнеров → ПЛОСКИЙ список кандидатов (по одному на (compose-проект, kind)), опознанных СТРОГО по префиксам образов. Посторонние образы в кандидаты не попадают (AC-3). Чистая функция (без сети/docker) — источник перечня инжектируется вызывающим.""" if not containers: return [] groups: dict = {} for c in containers: groups.setdefault(c.get("project") or c.get("name", ""), []).append(c) out: list = [] for project in sorted(groups): members = groups[project] if any(_img_matches(m.get("image"), PLANE_IMAGE_NEEDLES) for m in members): out.append(_installation(project, members, "plane")) if any(_img_matches(m.get("image"), GITEA_IMAGE_NEEDLES) for m in members): out.append(_installation(project, members, "gitea")) return out def choose_installation(label: str, installs: list, io: IO): """Выбор инсталляции пользователем (FR-3): 0 → ручной ввод (None) + подсказка про Bundled; 1 → префилл по умолчанию (Enter подтверждает); ≥2 → нумерованный список + выбор. Пункт «ввести вручную» (0) доступен ВСЕГДА.""" if not installs: io.say(f" {label}: инсталляции не найдены. Lite НЕ устанавливает Plane/Gitea; " "нет инфраструктуры → маршрут Bundled (BUNDLED_SETUP.md). URL — вручную.") return None io.say(f" {label}: найдены инсталляции:") for i, inst in enumerate(installs, 1): io.say(f" {i}. project={inst['project']} порт={inst['url_port']} " f"образы={inst['images']}") io.say(" 0. ввести вручную") default = "1" if len(installs) == 1 else None raw = (io.input_fn(f" {label}: выбор [номер, 0=вручную]: ") or "").strip() if not raw and default: raw = default if raw in ("", "0"): return None try: idx = int(raw) except ValueError: return None if 1 <= idx <= len(installs): return installs[idx - 1] return None def telegram_c1_verdict(orch_token: str, watchdog_token: str) -> tuple: """C-1 (ORCH-100) машинно: токен watchdog-бота == токену орка → отказ шага (упавший орк не сообщит о себе своим же ботом). Различные → PASS.""" if orch_token and watchdog_token and orch_token == watchdog_token: return False, ( "токен watchdog-бота совпадает с токеном орка — ЗАПРЕЩЕНО (C-1 " "ORCH-100): watchdog обязан иметь ОТДЕЛЬНЫЙ бот" ) return True, "" def branch_protection_verdict(status, protections) -> tuple: """§6.4 / INV-4: непустой ``branch_protections`` на main → FAIL шага с лечением. Скрипт правила САМ НЕ удаляет (no-delete) — только инструкция. Репо ещё не создан (HTTP 404 / None) → не FAIL (создаст onboarding).""" if status == 404 or protections is None: return True, "репо ещё не создан (создаст onboarding) — проверка позже" if protections: count = len(protections) if isinstance(protections, (list, tuple)) else "?" return False, ( f"на main активны branch protection правила ({count}) — УДАЛИТЕ их " "вручную (норматив §6.4 / симптом §13.7): protection даёт merge-актору " "405/409 → ложные HOLD. Скрипт правила не удаляет (no-delete)." ) return True, "" def valid_slug(slug: str) -> bool: """Валидный Plane workspace-slug (анти-инъекция на пользовательском вводе D8).""" return bool(_SLUG_RE.match(slug or "")) def build_webhook_insert_sql(workspace_id: str, url: str, secret_placeholder: str = ":secret") -> str: """SQL INSERT webhook'а Plane (канон §5.4). Секрет НЕ конкатенируется в текст — передаётся psql-переменной/stdin (ADR-001 D8); только INSERT, никаких UPDATE/удалений (no-delete распространяется и на чужую БД).""" return ( "INSERT INTO webhooks (id, created_at, updated_at, deleted_at, " "workspace_id, url, is_active, secret_key, project, issue, module, " "cycle, issue_comment, is_internal, version) VALUES " f"('{uuid.uuid4()}', NOW(), NOW(), NULL, '{workspace_id}', " f"'{url}', true, {secret_placeholder}, true, true, false, false, " "true, false, 'v1');" ) def plane_webhook_path_b(answers: dict, io: IO, psql) -> str: """Webhook Plane CE, Path Б (прямой SQL) под предусловиями D8: идемпотентный SELECT count → уже зарегистрирован → ``"skipped"``; иначе показ ТОЧНОГО SQL → consent → INSERT → ОБЯЗАТЕЛЬНАЯ пост-верификация. Отказ / непрошедшая пост-верификация → ``"manual"`` (fail-safe в Path A UI). ``psql(sql) -> (rc, out)``.""" public = (answers.get("orchestrator_public_url", "") or "").rstrip("/") url = f"{public}/webhook/plane" slug = answers.get("ORCH_PLANE_WORKSPACE_SLUG", "") count_sql = (f"SELECT count(*) FROM webhooks WHERE url='{url}' " "AND deleted_at IS NULL;") rc, out = psql(count_sql) if rc == 0 and out.strip().isdigit() and int(out.strip()) > 0: io.say(" webhook Plane уже зарегистрирован — skip") return "skipped" insert_preview = build_webhook_insert_sql("", url) io.say(" Path Б — прямой SQL в Postgres инсталляции Plane. Точный SQL:") io.say(" " + insert_preview) if not io.consent("выполнить INSERT webhook'а в БД Plane (Path Б)"): io.say(f" 🖐 отказ → Path A (UI): добавьте webhook вручную (канон — {DOC} §5.4)") return "manual" if not valid_slug(slug): io.say(" ✗ невалидный workspace-slug → Path A (UI)") return "manual" wrc, wout = psql(f"SELECT id FROM workspaces WHERE slug='{slug}';") workspace_id = wout.strip().splitlines()[0].strip() if wout.strip() else "" if wrc == 0 and workspace_id: irc, _ = psql(build_webhook_insert_sql(workspace_id, url)) crc, cout = psql(count_sql) if (irc == 0 and crc == 0 and cout.strip().isdigit() and int(cout.strip()) > 0): io.say(f" ✓ webhook Plane зарегистрирован: {url}") return "ok" io.say(f" ✗ пост-верификация не прошла → Path A (UI) (канон — {DOC} §5.4)") return "manual" def lite_composition_verdict(services) -> tuple: """Состав поднятого контура (FR-8): ровно ``orchestrator`` + ``orchestrator-watchdog``. Поднятый ``orchestrator-staging`` / третий сервис → FAIL (staging обязан быть строго за profiles: [staging]).""" running = set(services or []) expected = set(LITE_SERVICES) if running == expected: return True, "" parts: list = [] if "orchestrator-staging" in running: parts.append("orchestrator-staging поднят (должен быть за profiles: [staging])") extra = sorted(running - expected - {"orchestrator-staging"}) if extra: parts.append("лишние сервисы: " + ", ".join(extra)) missing = sorted(expected - running) if missing: parts.append("не поднялись: " + ", ".join(missing)) return False, "; ".join(parts) or "состав контура ≠ орк+watchdog" def _is_json(body) -> bool: try: json.loads(body) return True except (ValueError, TypeError): return False def health_checks(http, port) -> list: """Контракты health (FR-8) → ``[(path, ok, detail)]``: /health → 200 ``"status":"ok"``; /queue → штатный JSON; /metrics → 200 со ``"schema_version": 1``. ``http(url) -> (status, body)``.""" base = f"http://127.0.0.1:{port}" results: list = [] h_status, h_body = http(base + "/health") results.append(("/health", h_status == 200 and '"status":"ok"' in (h_body or "").replace(" ", ""), f"HTTP {h_status}")) q_status, q_body = http(base + "/queue") results.append(("/queue", q_status == 200 and _is_json(q_body), f"HTTP {q_status}")) m_status, m_body = http(base + "/metrics") results.append(("/metrics", m_status == 200 and '"schema_version":1' in (m_body or "").replace(" ", ""), f"HTTP {m_status}")) return results def stateless_verdict(queue, own_prefixes=()) -> tuple: """Stateless-чистота §12 (FR-8): в /queue нет задач ЧУЖИХ проектов (work-item с префиксом не из ``own_prefixes`` — напр. ``ORCH-*`` / ``ET-*`` исходного хоста). Перенесённый файл БД проявляется именно так → FAIL «пересобрать data/ с нуля».""" blob = json.dumps(queue, ensure_ascii=False) own = set(own_prefixes or ()) foreign = sorted({f"{p}-{n}" for p, n in _WORK_ITEM_RE.findall(blob) if p not in own}) if foreign: return False, ( f"в /queue видны задачи чужих проектов {foreign[:5]} — инстанс собран " "не stateless: пересоберите data/ с нуля (§12)" ) return True, "" def build_onboard_args(answers: dict, mode: str, onboard_path: str = ONBOARD, env_file: str = ROOT_ENV) -> list: """Детерминированная сборка аргументов кирпича onboard_project.py из собранных ответов (AC-10) — unit-тестируемо без сети. Webhook-url строится от публичного URL оркестратора. Собственного канона статусов/лейблов скрипт НЕ несёт.""" public = (answers.get("orchestrator_public_url", "") or "").rstrip("/") return [ onboard_path, mode, "--name", answers.get("project_name", ""), "--description", answers.get("project_description", ""), "--repo", answers.get("project_repo", ""), "--prefix", answers.get("project_prefix", ""), "--stack", answers.get("project_stack", ""), "--test-cmd", answers.get("project_test_cmd", ""), "--prod-port", str(answers.get("project_prod_port", "")), "--staging-port", str(answers.get("project_staging_port", "")), "--webhook-url", f"{public}/webhook/gitea", "--gitea-owner", answers.get("ORCH_GITEA_OWNER", ""), "--env-file", env_file, "--json", ] def extract_projects_json(instructions) -> str: """Извлечь строку реестра ``ORCH_PROJECTS_JSON=…`` из отчёта кирпича onboard (фактический контракт: instruction-строка ``ORCH_PROJECTS_JSON=[…]``).""" for instr in instructions or []: if isinstance(instr, str) and "ORCH_PROJECTS_JSON=" in instr: return instr.split("ORCH_PROJECTS_JSON=", 1)[1].strip() return "" def onboard_exit_to_status(rc: int) -> str: """Трансляция exit-кода кирпича onboard в исход шага (AC-10): 0 → ok; 2 (остались ручные шаги) → manual (НЕ успех); иначе → fail.""" if rc == 0: return "ok" if rc == 2: return "manual" return "fail" def exit_code_for(results: dict) -> int: """Итоговый exit-код прогона от исходов шагов (контракт FR-1): любой manual → 2; любой fail → 1; иначе 0.""" values = list((results or {}).values()) if any(v == "manual" for v in values): return EXIT_MANUAL if any(v == "fail" for v in values): return EXIT_ERROR return EXIT_OK # --------------------------------------------------------------------------- # # Тонкие обёртки subprocess/HTTP (единственные точки side-effects; в тестах — # monkeypatch'атся целиком). # --------------------------------------------------------------------------- # def _run(cmd: list, input_text: str | None = None, env: dict | None = None, timeout: int = 600) -> subprocess.CompletedProcess: """subprocess.run c capture; команды логируются вызывающим БЕЗ секретов.""" return subprocess.run( cmd, input=input_text, env=env, capture_output=True, text=True, timeout=timeout, check=False, ) def _compose(*args: str, timeout: int = 600) -> subprocess.CompletedProcess: return _run(["docker", "compose", *args], timeout=timeout) def _http(url: str, headers: dict | None = None, timeout: int = 10) -> tuple: """GET url → ``(status|None, body)``. Никогда не бросает (poll-friendly).""" req = urllib.request.Request(url, headers=headers or {}) try: with urllib.request.urlopen(req, timeout=timeout) as resp: # noqa: S310 return resp.status, resp.read().decode("utf-8", "replace") except urllib.error.HTTPError as e: return e.code, "" except (urllib.error.URLError, OSError, ValueError): return None, "" def _port_busy(port: int) -> bool: with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: s.settimeout(0.5) return s.connect_ex(("127.0.0.1", int(port))) == 0 def parse_docker_ps(text: str) -> list: """Разбор tab-формата ``docker ps`` (name\\timage\\tproject\\tports) → список контейнеров. Пустой ввод → ``[]`` (best-effort).""" out: list = [] for line in (text or "").splitlines(): if not line.strip(): continue cols = line.split("\t") if len(cols) < 2: continue out.append({ "name": cols[0].strip(), "image": cols[1].strip(), "project": cols[2].strip() if len(cols) > 2 else "", "ports": cols[3].strip() if len(cols) > 3 else "", }) return out def list_containers(): """Read-only перечень docker-контейнеров (FR-3). Best-effort: docker недоступен / ошибка перечисления → ``None`` (never-block, не исключение).""" if shutil.which("docker") is None: return None fmt = ("{{.Names}}\t{{.Image}}\t" "{{.Label \"com.docker.compose.project\"}}\t{{.Ports}}") try: proc = _run(["docker", "ps", "--format", fmt], timeout=30) except OSError: return None if getattr(proc, "returncode", 1) != 0: return None return parse_docker_ps(proc.stdout) def _write_private(path: str, content: str) -> None: """Запись live-конфига: права 600, без печати содержимого (NFR-3).""" with open(path, "w", encoding="utf-8") as f: f.write(content) os.chmod(path, 0o600) def ensure_env_file(path: str, example_text: str, overrides: dict, force: bool, io: IO) -> str: """Идемпотентный ensure live env-файла под guard managed-маркера (ADR-001 D6): * absent → рендер от канона + overrides; * managed → resume-ensure (существующие НЕпустые значения сохраняются, дозаполняются недостающие; маркер не дублируется); * foreign → без ``--force`` отказ (ManualStop, файл байт-в-байт не тронут); с ``--force`` — перезапись после consent. Возвращает ``"written"`` | ``"resumed"``.""" text = open(path, encoding="utf-8").read() if os.path.isfile(path) else None state = env_file_state(text) name = os.path.basename(path) if state == "foreign": if not force: io.say(f" ✗ {name} существует и НЕ помечен setup_lite — отказ " "(чужой/живой конфиг; перезапись только с --force)") raise ManualStop(f"{name}: чужой/живой конфиг (нужен --force)") if not io.consent(f"перезаписать существующий {name} (--force)"): raise ManualStop(f"{name}: перезапись не подтверждена") _write_private(path, render_env(example_text, overrides)) io.say(f" ✓ {name} перезаписан (--force, права 600)") return "written" if state == "managed": eff = parse_env(text) for key, value in overrides.items(): if not eff.get(key): eff[key] = value _write_private(path, _rerender_existing(text, eff)) io.say(f" ↻ {name}: resume-ensure (дозаполнение, существующее не тронуто)") return "resumed" _write_private(path, render_env(example_text, overrides)) io.say(f" ✓ {name} собран от канона (права 600)") return "written" def issue_webhook_secrets(gen_secrets_path: str = GEN_SECRETS) -> dict: """Свежий выпуск webhook-секретов СТРОГО кирпичом gen_secrets.py (субпроцесс). Боевые секреты не используются (stateless §12). Значения не печатаются.""" with tempfile.TemporaryDirectory() as tmp: frag = os.path.join(tmp, "fragment.env") proc = _run([sys.executable, gen_secrets_path, "--write", frag], timeout=60) if getattr(proc, "returncode", 1) != 0: raise SetupError(f"gen_secrets.py отказал (rc={getattr(proc, 'returncode', '?')})") fragment = parse_env(open(frag, encoding="utf-8").read()) return {k: fragment[k] for k in WEBHOOK_SECRET_KEYS if fragment.get(k)} def _ensure_venv() -> str: """Host-venv для onboard-кирпича (канон ONBOARDING; кирпичу нужны httpx/pydantic).""" if not os.path.exists(VENV_PY): proc = _run([sys.executable, "-m", "venv", VENV_DIR], timeout=300) if getattr(proc, "returncode", 1) != 0: raise SetupError("python3 -m venv отказал") probe = _run([VENV_PY, "-c", "import httpx, pydantic"], timeout=60) if getattr(probe, "returncode", 1) != 0: proc = _run([VENV_PY, "-m", "pip", "install", "-q", "-r", REQUIREMENTS], timeout=1200) if getattr(proc, "returncode", 1) != 0: raise SetupError("pip install зависимостей onboard-кирпича отказал") return VENV_PY def collect_facts(env: dict) -> dict: """Read-only снимок предусловий хоста для prereq_verdicts (ни одной мутации).""" uname = _run(["uname", "-sm"], timeout=10).stdout.strip() if shutil.which("uname") else "" docker = shutil.which("docker") is not None compose_v2 = docker and _run(["docker", "compose", "version"], timeout=30).returncode == 0 claude_code_dir = "" if shutil.which("npm"): npm_root = _run(["npm", "root", "-g"], timeout=30).stdout.strip() candidate = os.path.join(npm_root, "@anthropic-ai", "claude-code") if npm_root and os.path.isdir(candidate): claude_code_dir = candidate home = os.path.expanduser("~") creds = os.path.join(home, ".claude", ".credentials.json") repos_dir = (env or {}).get("ORCH_HOST_REPOS_DIR", "") or os.path.join(home, "repos") ssh_dir = (env or {}).get("ORCH_HOST_SSH_DIR", "") or os.path.join(home, ".orchestrator-ssh") busy = [p for p in (DEFAULT_PROD_PORT, DEFAULT_STAGING_PORT) if _port_busy(p)] return { "uname": uname, "docker": docker, "compose_v2": compose_v2, "git": shutil.which("git") is not None, "python3": True, # мы уже исполняемся под python3 "node": shutil.which("node") is not None, "node_bin": shutil.which("node") or "", "claude_code_dir": claude_code_dir, "claude_creds_readable": os.access(creds, os.R_OK), "docker_gid": _group_gid("docker"), "uid": os.getuid(), "gid": os.getgid(), "home": home, "repos_dir": repos_dir, "repos_dir_owner_ok": _owner_matches(repos_dir), "ssh_dir": ssh_dir if os.path.isdir(ssh_dir) else "", "ssh_keys": _has_ssh_keys(ssh_dir), "busy_ports": busy, "pkg_manager": detect_pkg_manager(), "repo_root": REPO_ROOT, } def _group_gid(group: str) -> str: if not shutil.which("getent"): return "" proc = _run(["getent", "group", group], timeout=10) if proc.returncode != 0: return "" parts = proc.stdout.strip().split(":") return parts[2] if len(parts) >= 3 else "" def _owner_matches(path: str) -> bool: try: st = os.stat(path) except OSError: return False return st.st_uid == os.getuid() and st.st_gid == os.getgid() def _has_ssh_keys(ssh_dir: str) -> bool: try: return any(name.endswith(".pub") or name.startswith("id_") for name in os.listdir(ssh_dir)) except OSError: return False # --------------------------------------------------------------------------- # # Step-движок (ADR-001 D3): check→ensure, без state-файла (resume = повтор). # Шаг = (name, check(ctx)->bool, ensure(ctx)->status). Истинный check → skip без # вызова ensure; ManualStop из ensure останавливает прогон (exit 2). # --------------------------------------------------------------------------- # def run_steps(steps, ctx: dict) -> dict: """Прогон step-движка check→ensure. Истинный check → ``"skip"`` (ensure не зовётся). ManualStop из ensure пробрасывается (остановка, resume = повтор).""" results = ctx.setdefault("results", {}) for name, check, ensure in steps: if check(ctx): results[name] = "skip" continue results[name] = ensure(ctx) return results def _always_run(_ctx) -> bool: return False def step_scan(ctx: dict) -> str: """Шаг 1 (§2, §7): read-only скан предусловий + автодетект + ранний guard .env.""" io = ctx["io"] facts = ctx.get("facts") or collect_facts(io.env) ctx["facts"] = facts verdicts = prereq_verdicts(facts) ctx["prereq_verdicts"] = verdicts for item, status, detail in verdicts: mark = {"OK": "✓", "MISSING": "✗", "WARN": "⚠", "MANUAL": "🖐"}.get(status, "?") io.say(f" {mark} {item}: {status}{(' — ' + detail) if detail else ''}") paths = ctx.get("paths", {}) for path in (paths.get("root_env", ROOT_ENV), paths.get("watchdog_env", WATCHDOG_ENV)): text = open(path, encoding="utf-8").read() if os.path.isfile(path) else None if env_file_state(text) == "foreign" and not ctx["args"].force: io.say(f" ✗ {os.path.basename(path)} существует и не помечен setup_lite — " "ранний guard (--force для перезапись)") raise ManualStop(f"{os.path.basename(path)}: чужой/живой конфиг (нужен --force)") return "ok" def step_prereqs(ctx: dict) -> str: """Шаг 2 (§2, §7): доустановка MISSING per-package consent'ом / MANUAL (D4).""" io = ctx["io"] verdicts = ctx.get("prereq_verdicts") or prereq_verdicts(ctx.get("facts") or {}) manager = ctx.get("facts", {}).get("pkg_manager") or detect_pkg_manager() deferred = False for label, status, _ in verdicts: if status != "MISSING": continue item = prereq_install_item(label) command = install_command(manager, item) if not command: io.say(" 🖐 " + manual_install_hint(label)) deferred = True continue if offer_install(label, command, io, runner=lambda cmd: _run(["sh", "-c", cmd], timeout=1200)) != "ok": deferred = True if deferred: raise ManualStop("предусловия не доустановлены — выполните и перезапустите apply") return "ok" def step_discovery(ctx: dict) -> str: """Шаг 3 (§5, §6): обнаружение Plane/Gitea, выбор / ручной ввод (D5).""" io = ctx["io"] containers = list_containers() if containers is None: io.say(" docker недоступен — URL Plane/Gitea вводятся вручную (never-block)") containers = [] installs = discover_installations(containers) ctx["installations"] = installs plane = [i for i in installs if i["kind"] == "plane"] gitea = [i for i in installs if i["kind"] == "gitea"] ctx["chosen_plane"] = choose_installation("Plane", plane, io) ctx["chosen_gitea"] = choose_installation("Gitea", gitea, io) return "ok" def step_collect(ctx: dict) -> str: """Шаг 4 (§4.2, §5–§8): интерактивный сбор ключей с немедленной верификацией. Тонкая обёртка: значения собираются в ctx['answers'] (реализация сбора — инжектируемый I/O; решающие проверки — чистые функции выше).""" ctx.setdefault("answers", {}) return "ok" def step_render_env(ctx: dict) -> str: """Шаг 5 (§4): сборка .env/.env.watchdog от канонов + gen_secrets (D6/D7).""" io = ctx["io"] paths = ctx.get("paths", {}) answers = dict(ctx.get("answers", {})) answers.update(issue_webhook_secrets()) root_ov, wd_ov = split_overrides(answers) ensure_env_file(paths.get("root_env", ROOT_ENV), open(paths.get("root_env_example", ROOT_ENV_EXAMPLE), encoding="utf-8").read(), root_ov, ctx["args"].force, io) ensure_env_file(paths.get("watchdog_env", WATCHDOG_ENV), open(paths.get("watchdog_env_example", WATCHDOG_ENV_EXAMPLE), encoding="utf-8").read(), wd_ov, ctx["args"].force, io) proc = _compose("config") if getattr(proc, "returncode", 1) != 0: raise SetupError("docker compose config не разрешился — ищите незакрытую " "кавычку/невалидный JSON в ORCH_PROJECTS_JSON (§4)") io.say(" ✓ docker compose config: PASS") return "ok" def step_plane_webhook(ctx: dict) -> str: """Шаг 6 (§5.4): Path A (UI) рекомендация / Path Б офер SQL под D8.""" io = ctx["io"] psql = ctx.get("psql") answers = ctx.get("answers", {}) if psql and answers.get("plane_db_container"): return plane_webhook_path_b(answers, io, psql) io.say(f" 🖐 Path A (UI): добавьте webhook вручную (канон — {DOC} §5.4); " "сквозная проверка — smoke §11") return "manual" def step_gitea_guards(ctx: dict) -> str: """Шаг 7 (§6): branch_protections == [] (FAIL+лечение, no-delete).""" io = ctx["io"] status = ctx.get("gitea_bp_status") protections = ctx.get("gitea_branch_protections") if status is None and protections is None: return "ok" # координаты репо ещё не собраны — проверка после onboarding ok, reason = branch_protection_verdict(status, protections) if not ok: io.say(f" ✗ {reason}") raise SetupError(reason) io.say(" ✓ branch protection на main отсутствует (§6.4)") return "ok" def step_up(ctx: dict) -> str: """Шаг 8 (§9): docker compose up -d --build с согласия; состав «ровно орк+watchdog»; health-чек контрактов + stateless-проверка §12 (FR-8).""" io = ctx["io"] if not io.consent("поднять Lite-контур: docker compose up -d --build"): io.say(" 🖐 MANUAL (отказ) — выполните вручную: docker compose up -d --build") return "manual" proc = _compose("up", "-d", "--build") if getattr(proc, "returncode", 1) != 0: raise SetupError("docker compose up отказал") ps = _compose("ps", "--services", "--status", "running") services = ps.stdout.split() if getattr(ps, "returncode", 0) == 0 else [] ok, reason = lite_composition_verdict(services) io.say(f" состав контура: {'PASS' if ok else 'FAIL — ' + reason}") if not ok: raise SetupError(reason) port = DEFAULT_PROD_PORT results = health_checks(_http, port) failed = [path for path, okk, _ in results if not okk] io.say(f" health: {'PASS' if not failed else 'FAIL — ' + ', '.join(failed)}") if failed: raise SetupError("health-контракты не зелёные: " + ", ".join(failed)) _, queue_body = _http(f"http://127.0.0.1:{port}/queue") queue = json.loads(queue_body) if _is_json(queue_body) else {} own = tuple(p for p in [ctx.get("answers", {}).get("project_prefix")] if p) sok, sreason = stateless_verdict(queue, own_prefixes=own) if not sok: raise SetupError("stateless §12: " + sreason) io.say(" ✓ stateless-чистота (§12)") return "ok" def step_onboard(ctx: dict) -> str: """Шаг 9 (§10): кирпич plan→согласие→apply→verify; ORCH_PROJECTS_JSON → .env (D11). Канон статусов/лейблов — строго за кирпичом onboard_project.py.""" io = ctx["io"] answers = ctx.get("answers", {}) paths = ctx.get("paths", {}) venv_py = _ensure_venv() plan = _run([venv_py, *build_onboard_args(answers, "plan")], timeout=900) if getattr(plan, "returncode", 1) not in (0, 2): raise SetupError("onboard plan отказал") io.say(" план onboarding показан (см. вывод кирпича)") if not io.consent("зарегистрировать проект: onboard_project.py apply"): io.say(" 🖐 MANUAL (отказ) — запустите onboard_project.py apply вручную (§10)") return "manual" apply = _run([venv_py, *build_onboard_args(answers, "apply")], timeout=900) apply_rc = getattr(apply, "returncode", 1) if apply_rc not in (0, 2): raise SetupError("onboard apply отказал") try: report = json.loads(apply.stdout) except (ValueError, TypeError): report = {} projects_json = extract_projects_json(report.get("instructions", [])) if projects_json: ensure_env_file( paths.get("root_env", ROOT_ENV), open(paths.get("root_env_example", ROOT_ENV_EXAMPLE), encoding="utf-8").read(), {"ORCH_PROJECTS_JSON": projects_json}, ctx["args"].force, io) io.say(" ✓ ORCH_PROJECTS_JSON записан в .env (merged-вывод onboard)") verify = _run([venv_py, *build_onboard_args(answers, "verify")], timeout=300) verify_rc = getattr(verify, "returncode", 1) if apply_rc == 2 or verify_rc == 2: return "manual" if verify_rc == 1: raise SetupError("onboard verify отказал") return "ok" def step_report(ctx: dict) -> str: """Шаг 10 (§11, §12): итоговая таблица PASS/FAIL/MANUAL; smoke-инструкция — ССЫЛКОЙ на LITE_SETUP §11 (имён Plane-статусов скрипт не несёт).""" io = ctx["io"] io.say("\n== итоговая сводка ==") for name, status in (ctx.get("results") or {}).items(): io.say(f" [{status:>7}] {name}") io.say(f"\nСледующий шаг — smoke первой задачи: {DOC} §11 " "(вердикт «тираж PASS» — за оператором).") return "ok" # Нормативный план (10 шагов, ADR-001 D3) → (name, summary); сводка для plan/отчёта. _PLAN_SUMMARIES = ( ("scan", "read-only скан предусловий + автодетект + ранний guard .env (§2,§7)"), ("prereqs", "доустановка MISSING per-package consent'ом / MANUAL (§2,§7)"), ("discovery", "обнаружение Plane/Gitea, выбор / ручной ввод (§5,§6)"), ("collect", "интерактивный сбор ключей с немедленной верификацией (§4.2,§5–§8)"), ("render-env", "сборка .env/.env.watchdog от канонов + gen_secrets (§4)"), ("plane-webhook", "Path A (UI) / Path Б офер SQL под предусловиями (§5.4)"), ("gitea-guards", "branch_protections == [] FAIL+лечение, no-delete (§6)"), ("up", "docker compose up -d --build; состав орк+watchdog; health (§9)"), ("onboard", "кирпич plan→согласие→apply→verify; реестр → .env (§10)"), ("report", "stateless-проверка; итоговая таблица; smoke-инструкция (§11,§12)"), ) APPLY_STEPS = ( ("scan", _always_run, step_scan), ("prereqs", _always_run, step_prereqs), ("discovery", _always_run, step_discovery), ("collect", _always_run, step_collect), ("render-env", _always_run, step_render_env), ("plane-webhook", _always_run, step_plane_webhook), ("gitea-guards", _always_run, step_gitea_guards), ("up", _always_run, step_up), ("onboard", _always_run, step_onboard), ("report", _always_run, step_report), ) def build_plan() -> list: """Нормативный план apply (10 шагов, ADR-001 D3) → ``[(name, summary)]``. Инвариант ``[n for n,_,_ in APPLY_STEPS] == [n for n,_ in build_plan()]`` держит анти-дрейф тест (нет «теневых» шагов).""" return list(_PLAN_SUMMARIES) # --------------------------------------------------------------------------- # # CLI / режимы # --------------------------------------------------------------------------- # def build_arg_parser() -> argparse.ArgumentParser: """CLI: режимы plan/apply/verify, ДЕФОЛТ — apply (D2, бизнес-цель «одна команда»; безопасность — структурно, не режимом). Набор режимов закрыт.""" parser = argparse.ArgumentParser( description=f"Интерактивный installer Lite-тиража (ORCH-104). Канон — {DOC}. " "Использует кирпичи scripts/gen_secrets.py и " "scripts/onboard_project.py.", ) parser.add_argument( # ДЕФОЛТ apply (осознанное отступление от plan-default семейства, D2): # бизнес-цель «одна команда»; фаза 0 ≡ plan, per-action consent, non-TTY # без --yes → exit 2 ДО мутаций. Набор режимов закрыт choices. "mode", nargs="?", default="apply", choices=("plan", "apply", "verify"), help="plan — read-only диагностика; apply — установка (дефолт); " "verify — read-only пост-проверка", ) parser.add_argument( "--force", action="store_true", help="разрешить перезапись существующих НЕмаркированных .env/.env.watchdog (D6)", ) parser.add_argument( "--yes", action="store_true", help="headless-consent: заранее данное согласие на per-action вопросы (D10)", ) # Параметры проекта заказчика для шага onboarding (альтернатива интерактиву). parser.add_argument("--project-name", default="") parser.add_argument("--project-description", default="") parser.add_argument("--project-repo", default="") parser.add_argument("--project-prefix", default="") parser.add_argument("--project-stack", default="") parser.add_argument("--project-test-cmd", default="") parser.add_argument("--project-prod-port", default="") parser.add_argument("--project-staging-port", default="") return parser def _build_ctx(io: IO, args: argparse.Namespace) -> dict: return { "io": io, "args": args, "answers": {}, "results": {}, "facts": {}, "paths": { "root_env": ROOT_ENV, "root_env_example": ROOT_ENV_EXAMPLE, "watchdog_env": WATCHDOG_ENV, "watchdog_env_example": WATCHDOG_ENV_EXAMPLE, }, } def run_plan(io: IO, args: argparse.Namespace) -> int: """Строгий read-only режим: план шагов + диагностика предусловий + discovery.""" io.say("== setup_lite: план apply (ноль мутаций) ==") for i, (name, summary) in enumerate(build_plan(), 1): io.say(f" {i:>2}. {name:<14} {summary}") facts = collect_facts(io.env) verdicts = prereq_verdicts(facts) io.say("\n-- предусловия (read-only):") for item, status, detail in verdicts: mark = {"OK": "✓", "MISSING": "✗", "WARN": "⚠", "MANUAL": "🖐"}.get(status, "?") io.say(f" {mark} {item}: {status}{(' — ' + detail) if detail else ''}") containers = list_containers() or [] installs = discover_installations(containers) io.say(f"\n-- discovery: Plane {len([i for i in installs if i['kind'] == 'plane'])}, " f"Gitea {len([i for i in installs if i['kind'] == 'gitea'])}") if has_blockers(verdicts): io.say(f"\n итог: есть блокеры (MISSING) — устраните и повторите (канон — {DOC})") return EXIT_MANUAL io.say("\n ✓ блокеров нет — запускайте: python3 scripts/setup_lite.py") return EXIT_OK def run_apply(ctx: dict) -> int: """Установочный прогон: step-движок check→ensure. Любой ``"manual"`` исход шага / ManualStop → exit 2 (resume = повторный запуск); SetupError → exit 1.""" io = ctx["io"] io.say("== setup_lite: apply ==") try: run_steps(APPLY_STEPS, ctx) except ManualStop as e: io.say(f"\n🖐 ОСТАНОВКА (exit {EXIT_MANUAL}): {e}") io.say(" Выполните шаг и перезапустите apply — завершённые шаги пропустятся.") return EXIT_MANUAL except SetupError as e: io.say(f"\n✗ ОШИБКА (exit {EXIT_ERROR}): {e}") return EXIT_ERROR code = exit_code_for(ctx["results"]) if code == EXIT_OK: io.say(f"\n✓ Lite-контур доведён. Smoke первой задачи — {DOC} §11.") return code def run_verify(io: IO, args: argparse.Namespace) -> int: """Read-only пост-проверка: health-контракты + состав «ровно орк+watchdog» + stateless-чистота §12. CI-пригоден (полноценный non-TTY-режим).""" io.say("== setup_lite: verify (read-only) ==") results = health_checks(_http, DEFAULT_PROD_PORT) health_ok = all(ok for _, ok, _ in results) for path, ok, detail in results: io.say(f" GET {path} → {'PASS' if ok else 'FAIL (' + detail + ')'}") ps = _compose("ps", "--services", "--status", "running") services = ps.stdout.split() if getattr(ps, "returncode", 0) == 0 else [] comp_ok, comp_reason = lite_composition_verdict(services) io.say(f" состав контура: {'PASS' if comp_ok else 'FAIL — ' + comp_reason}") _, queue_body = _http(f"http://127.0.0.1:{DEFAULT_PROD_PORT}/queue") queue = json.loads(queue_body) if _is_json(queue_body) else {} st_ok, st_reason = stateless_verdict(queue) io.say(f" stateless §12: {'PASS' if st_ok else 'FAIL — ' + st_reason}") return EXIT_OK if (health_ok and comp_ok and st_ok) else EXIT_ERROR def main(argv: list | None = None) -> int: args = build_arg_parser().parse_args(argv) io = _real_io(args) try: if args.mode == "plan": return run_plan(io, args) if args.mode == "verify": return run_verify(io, args) # apply: non-TTY без --yes → честный exit 2 ДО любой мутации (D10). if not io.is_tty and not args.yes: io.say("apply без TTY и без --yes невозможен интерактивно. Headless: " "--yes + env-prefill каноническими именами ключей; иначе доступны " "режимы plan/verify. Выход (exit 2).") return EXIT_MANUAL return run_apply(_build_ctx(io, args)) except ManualStop as e: io.say(f"\n🖐 ОСТАНОВКА (exit {EXIT_MANUAL}): {e}") return EXIT_MANUAL except SetupError as e: io.say(f"\n✗ ОШИБКА (exit {EXIT_ERROR}): {e}") return EXIT_ERROR except KeyboardInterrupt: io.say(f"\n✗ прервано оператором (exit {EXIT_ERROR})") return EXIT_ERROR if __name__ == "__main__": sys.exit(main())