Files
orchestrator/tests/test_onboarding_script.py

606 lines
23 KiB
Python

"""ORCH-009: tests of the operator onboarding CLI (`scripts/onboard_project.py`).
Covers test-plan TC-02 (live-copy of the canon), TC-09..TC-11 (render /
anti-leak / referential integrity), TC-12 (registry round-trip through the
actual parser), TC-13..TC-16 (plan: Plane/Gitea completeness + dry-run with
zero mutations), TC-17..TC-18 (idempotent & safe apply).
All tests are deterministic and offline (NFR-5): the Plane/Gitea clients are
replaced with in-memory fakes; git is replaced with a recording runner. The
script module is loaded via importlib (pattern: tests/test_staging_check_b6.py).
"""
import importlib.util
import json
import os
import re
import pytest
_REPO_ROOT = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
_SCRIPT_PATH = os.path.join(_REPO_ROOT, "scripts", "onboard_project.py")
def _load_module():
spec = importlib.util.spec_from_file_location("onboard_project", _SCRIPT_PATH)
module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(module)
return module
@pytest.fixture(scope="module")
def mod():
return _load_module()
_WEBHOOK_URL = "https://orchestrator.example.org/webhook/gitea"
def _params(**over):
p = {
"PROJECT_NAME": "Demo Project",
"PROJECT_DESCRIPTION": "Демо-проект для проверки онбординга",
"REPO": "demo-project",
"GITEA_OWNER": "admin",
"WORK_ITEM_PREFIX": "DEMO",
"PLANE_PROJECT_ID": "11111111-2222-3333-4444-555555555555",
"STACK": "Python 3.12 + FastAPI + SQLite",
"TEST_CMD": "pytest tests/ -q",
"PROD_PORT": "8600",
"STAGING_PORT": "8601",
}
p.update(over)
return p
def _step(report, step_id):
matches = [s for s in report.steps if s.id == step_id]
assert matches, f"report has no step {step_id!r}: {[s.id for s in report.steps]}"
return matches[0]
# --------------------------------------------------------------------------- #
# Fakes — the only network touchpoints of the script, replaced in-memory.
# --------------------------------------------------------------------------- #
class FakePlane:
def __init__(self, mod, project=None, states=None, labels=None,
refuse_create_states=False, refuse_create_labels=False,
refuse_create_project=False):
self._mod = mod
self.project = project
self.states = list(states or [])
self.labels = list(labels or [])
self.mutations = []
self.refuse_create_states = refuse_create_states
self.refuse_create_labels = refuse_create_labels
self.refuse_create_project = refuse_create_project
# GET probes
def get_project(self, project_id):
if self.project and self.project.get("id") == project_id:
return self.project
return None
def find_project_by_identifier(self, identifier):
if self.project and self.project.get("identifier") == identifier:
return self.project
return None
def list_states(self, project_id):
return list(self.states)
def list_labels(self, project_id):
return list(self.labels)
# mutations
def create_project(self, name, identifier):
if self.refuse_create_project:
raise self._mod.ManualStep("Plane CE: projects API not available")
self.mutations.append(("create_project", name, identifier))
self.project = {"id": "plane-uuid-created", "name": name, "identifier": identifier}
return self.project
def create_state(self, project_id, name, group):
if self.refuse_create_states:
raise self._mod.ManualStep("Plane CE: states API not available")
self.mutations.append(("create_state", name, group))
state = {"id": f"uuid-{name}", "name": name, "group": group}
self.states.append(state)
return state
def create_label(self, project_id, name):
if self.refuse_create_labels:
raise self._mod.ManualStep("Plane CE: labels API not available")
self.mutations.append(("create_label", name))
label = {"id": f"uuid-{name}", "name": name}
self.labels.append(label)
return label
class FakeGitea:
def __init__(self, repo=None, hooks=None, files=None):
self.repo = repo
self.hooks = list(hooks or [])
self.files = dict(files or {}) # repo path -> text (for verify)
self.mutations = []
def get_repo(self, owner, repo):
return self.repo
def list_hooks(self, owner, repo):
return list(self.hooks)
def create_repo(self, owner, name, description=""):
self.mutations.append(("create_repo", owner, name))
self.repo = {"name": name, "owner": {"login": owner}, "empty": True}
return self.repo
def create_hook(self, owner, repo, url, secret, events):
self.mutations.append(("create_hook", url, tuple(events)))
hook = {"id": 1, "active": True, "config": {"url": url}, "events": list(events)}
self.hooks.append(hook)
return hook
# verify helpers
def get_file_text(self, owner, repo, path):
return self.files.get(path)
def list_dir(self, owner, repo, path):
prefix = path.rstrip("/") + "/"
names = {
rel[len(prefix):].split("/", 1)[0]
for rel in self.files
if rel.startswith(prefix)
}
return sorted(names) or None
def _full_states(mod):
return [
{"id": f"uuid-{name}", "name": name, "group": group}
for name, group in mod.STATE_GROUPS.items()
]
def _full_labels(mod):
return [{"id": f"uuid-{name}", "name": name} for name in mod.label_names()]
# --------------------------------------------------------------------------- #
# TC-02 — materialisation live-copies the canon (BR-2 / D3)
# --------------------------------------------------------------------------- #
def test_tc02_materialise_live_copies_canon(mod, tmp_path):
dest = tmp_path / "repo"
written = mod.materialize_kit(_params(), str(dest))
assert written, "materialize_kit wrote nothing"
templates = os.listdir(dest / "docs" / "_templates")
standards = os.listdir(dest / "docs" / "_standards")
assert len(templates) >= 16, f"expected >=16 canonical skeletons, got {len(templates)}"
assert len(standards) >= 3, f"expected >=3 standards, got {len(standards)}"
# Verbatim copy — byte-equal to the live canon of the orchestrator checkout.
for rel in ("PIPELINE_DOCS.md", "HANDOFF_PROTOCOL.md", "TRACEABILITY.md"):
src_path = os.path.join(_REPO_ROOT, "docs", "_standards", rel)
with open(src_path, encoding="utf-8") as f:
canon = f.read()
with open(dest / "docs" / "_standards" / rel, encoding="utf-8") as f:
copied = f.read()
assert copied == canon, f"{rel} must be live-copied verbatim (BR-2)"
# --------------------------------------------------------------------------- #
# TC-09 / TC-10 — render: no unresolved placeholders, no orc leaks (AC-5)
# --------------------------------------------------------------------------- #
def test_tc09_render_resolves_all_placeholders(mod):
rendered = mod.render_kit_in_memory(_params())
assert rendered, "render_kit_in_memory returned nothing"
for rel, content in rendered.items():
unresolved = mod.find_unresolved(content)
assert not unresolved, f"{rel} keeps unresolved placeholders: {unresolved}"
def test_tc10_no_orchestrator_specific_leaks(mod):
rendered = mod.render_kit_in_memory(_params())
joined = "\n".join(rendered.values())
assert re.search(r"ORCH-\d", joined) is None, (
"rendered kit leaks an ORCH-NNN work-item literal where the project "
"prefix belongs (TC-10)"
)
assert "8500" not in joined and "8501" not in joined, (
"rendered kit leaks the orchestrator prod/staging ports"
)
assert "self-hosting" not in joined.lower(), (
"rendered kit leaks the orchestrator self-hosting rules"
)
# The project's own parameters actually got substituted.
assert "DEMO-" in joined, "the project's work-item prefix was not substituted"
assert "demo-project" in joined, "the repo name was not substituted"
assert "8600" in joined and "8601" in joined, "ports were not substituted"
def test_render_is_a_pure_replace(mod):
text = "prefix {{WORK_ITEM_PREFIX}}-12 on port {{PROD_PORT}}"
out = mod.render(text, {"WORK_ITEM_PREFIX": "AB", "PROD_PORT": "9000"})
assert out == "prefix AB-12 on port 9000"
assert mod.find_unresolved("a {{LEFT_OVER}} b") == ["{{LEFT_OVER}}"]
# --------------------------------------------------------------------------- #
# TC-11 — referential integrity of rendered prompts/AGENTS.md (AC-5)
# --------------------------------------------------------------------------- #
_PATH_TOKEN = re.compile(
r"(?:docs/[\w./\-]+|\.openclaw/agents/[\w.\-]+|CLAUDE\.md|AGENTS\.md|"
r"CONTRIBUTING\.md|README\.md|CHANGELOG\.md|\.env\.example)"
)
def _static_paths(text: str) -> set[str]:
out = set()
for token in _PATH_TOKEN.findall(text):
token = token.rstrip(".,;:)`'\"")
# dynamic/illustrative tokens are not checkable paths
if any(ch in token for ch in "<>*{}") or "NNN" in token:
continue
out.add(token)
return out
def test_tc11_referenced_paths_exist_in_materialised_tree(mod, tmp_path):
dest = tmp_path / "repo"
mod.materialize_kit(_params(), str(dest))
sources = [
os.path.join(dest, ".openclaw", "agents", f"{a}.md")
for a in ("analyst", "architect", "developer", "reviewer", "tester", "deployer")
]
sources.append(os.path.join(dest, "AGENTS.md"))
broken = []
for src_file in sources:
with open(src_file, encoding="utf-8") as f:
for ref in _static_paths(f.read()):
target = os.path.join(dest, *ref.split("/"))
if not (os.path.isfile(target) or os.path.isdir(target.rstrip("/"))):
broken.append((os.path.relpath(src_file, dest), ref))
assert not broken, f"kit references non-existent paths: {broken}"
# --------------------------------------------------------------------------- #
# TC-12 — registry round-trip through the ACTUAL parser (AC-6)
# --------------------------------------------------------------------------- #
def test_tc12_registry_round_trip_through_actual_parser(mod):
from src.projects import _parse_projects_json
existing = [
{
"plane_project_id": "7a79f0a9-5278-49cd-9007-9a338f238f9c",
"repo": "enduro-trails",
"work_item_prefix": "ET",
"name": "enduro-trails",
},
{
"plane_project_id": "8da6aa25-a60e-44d6-a1e2-d8ae59aa7d6a",
"repo": "orchestrator",
"work_item_prefix": "ORCH",
"name": "orchestrator",
},
]
params = _params()
entry = mod.build_registry_entry(params)
standalone, merged = mod.merged_projects_json(entry, json.dumps(existing))
# standalone entry parses on its own
solo = _parse_projects_json(f"[{standalone}]")
assert solo is not None and len(solo) == 1
parsed = _parse_projects_json(merged)
assert parsed is not None and len(parsed) == 3, "merged registry must carry all 3"
# existing entries survive verbatim (no loss, no distortion)
for i, exp in enumerate(existing):
assert parsed[i].plane_project_id == exp["plane_project_id"]
assert parsed[i].repo == exp["repo"]
assert parsed[i].work_item_prefix == exp["work_item_prefix"]
assert parsed[i].name == exp["name"]
# the new entry carries the source params
new = parsed[2]
assert new.plane_project_id == params["PLANE_PROJECT_ID"]
assert new.repo == params["REPO"]
assert new.work_item_prefix == params["WORK_ITEM_PREFIX"]
assert new.name == params["PROJECT_NAME"]
def test_tc12_merge_is_idempotent_no_duplicates(mod):
from src.projects import _parse_projects_json
params = _params()
entry = mod.build_registry_entry(params)
once = json.dumps([entry])
_standalone, merged = mod.merged_projects_json(entry, once)
parsed = _parse_projects_json(merged)
assert parsed is not None and len(parsed) == 1, (
"re-merging an already-registered project must not duplicate it"
)
# --------------------------------------------------------------------------- #
# TC-13 — plan: exact Plane statuses (22) + groups + labels (AC-7)
# --------------------------------------------------------------------------- #
def test_state_groups_match_plane_name_to_key(mod):
from src.plane_sync import _PLANE_NAME_TO_KEY
assert set(mod.STATE_GROUPS) == set(_PLANE_NAME_TO_KEY), (
"STATE_GROUPS must cover exactly the canonical Plane status names"
)
# Code-critical constraints (ADR-001 D5): STOP joins the cancelled group
# (ORCH-090 fail-closed cancel); only Done/Cancelled/STOP are terminal —
# otherwise terminal-detection (ORCH-068) falsely terminates live tasks.
assert mod.STATE_GROUPS["STOP"] == "cancelled"
assert mod.STATE_GROUPS["Done"] == "completed"
assert mod.STATE_GROUPS["Cancelled"] == "cancelled"
terminal = {n for n, g in mod.STATE_GROUPS.items() if g in ("completed", "cancelled")}
assert terminal == {"Done", "Cancelled", "STOP"}
def test_tc13_plan_covers_all_statuses_and_labels(mod):
from src.plane_sync import _PLANE_NAME_TO_KEY
plane = FakePlane(mod)
gitea = FakeGitea()
report = mod.run_plan(_params(), plane, gitea, webhook_url=_WEBHOOK_URL)
for name in _PLANE_NAME_TO_KEY:
step = _step(report, f"plane.state:{name}")
assert step.status == mod.PLANNED, f"status {name!r} not planned: {step.status}"
stop = _step(report, "plane.state:STOP")
assert "cancelled" in stop.detail, "STOP step must pin the cancelled group"
for label in mod.label_names():
assert _step(report, f"plane.label:{label}").status == mod.PLANNED
assert set(mod.label_names()) == {"autoApprove", "autoDeploy", "Bug"}
# known UI-only steps are flagged manual, never silently dropped (D5)
assert _step(report, "plane.board-order").status == mod.MANUAL
assert _step(report, "plane.workspace-webhook").status == mod.MANUAL
# --------------------------------------------------------------------------- #
# TC-14 — Plane API refusal degrades to manual-step, never a crash (AC-7)
# --------------------------------------------------------------------------- #
def test_tc14_plane_refusal_becomes_manual_step(mod, tmp_path):
plane = FakePlane(
mod,
project={"id": _params()["PLANE_PROJECT_ID"], "identifier": "DEMO"},
refuse_create_states=True,
refuse_create_labels=True,
)
gitea = FakeGitea(
repo={"name": "demo-project", "empty": False},
hooks=[{"id": 1, "active": True, "config": {"url": _WEBHOOK_URL}}],
)
report = mod.run_apply(
_params(), plane, gitea,
webhook_url=_WEBHOOK_URL, git_runner=lambda cmd, cwd: 0,
workdir=str(tmp_path),
)
state_steps = [s for s in report.steps if s.id.startswith("plane.state:")]
assert state_steps and all(s.status == mod.MANUAL for s in state_steps), (
"refused Plane state creation must degrade to manual-step"
)
for s in state_steps:
assert "ONBOARDING.md" in s.detail, "manual-step must link the runbook"
label_steps = [s for s in report.steps if s.id.startswith("plane.label:")]
assert label_steps and all(s.status == mod.MANUAL for s in label_steps)
assert report.exit_code == 2, "manual steps -> exit code 2"
# --------------------------------------------------------------------------- #
# TC-15 / TC-16 — plan: Gitea layer complete; dry-run mutates NOTHING (AC-8)
# --------------------------------------------------------------------------- #
def test_tc15_plan_contains_gitea_repo_webhook_and_push(mod):
plane = FakePlane(mod)
gitea = FakeGitea()
report = mod.run_plan(_params(), plane, gitea, webhook_url=_WEBHOOK_URL)
assert _step(report, "gitea.repo").status == mod.PLANNED
hook = _step(report, "gitea.webhook")
assert hook.status == mod.PLANNED
for event in ("push", "pull_request", "status"):
assert event in hook.detail, f"webhook plan must name event {event!r}"
assert "HMAC" in hook.detail or "secret" in hook.detail.lower(), (
"webhook plan must mention the HMAC secret (kept out of git)"
)
push = _step(report, "kit.push")
assert push.status == mod.PLANNED
assert "push" in push.detail.lower()
def test_tc16_plan_is_a_pure_dry_run(mod, monkeypatch):
plane = FakePlane(mod)
gitea = FakeGitea()
def _boom(*_a, **_kw): # plan must never materialise or push
raise AssertionError("plan mode touched the disk / git")
monkeypatch.setattr(mod, "materialize_kit", _boom)
monkeypatch.setattr(mod, "initial_push", _boom)
report = mod.run_plan(_params(), plane, gitea, webhook_url=_WEBHOOK_URL)
assert plane.mutations == [], "plan made a Plane mutation"
assert gitea.mutations == [], "plan made a Gitea mutation"
assert report.steps, "plan produced an empty report"
def test_secret_never_leaks_into_report(mod):
plane = FakePlane(mod)
gitea = FakeGitea()
report = mod.run_plan(
_params(), plane, gitea, webhook_url=_WEBHOOK_URL,
webhook_secret="super-secret-hmac-value",
)
dumped = json.dumps(report.to_dict(), ensure_ascii=False)
assert "super-secret-hmac-value" not in dumped, (
"the webhook HMAC secret leaked into the report (NFR-3)"
)
# --------------------------------------------------------------------------- #
# TC-17 — apply is idempotent: existing entities -> skipped(exists) (AC-9)
# --------------------------------------------------------------------------- #
def test_tc17_second_apply_skips_everything_existing(mod, tmp_path):
params = _params()
plane = FakePlane(
mod,
project={"id": params["PLANE_PROJECT_ID"], "identifier": "DEMO"},
states=_full_states(mod),
labels=_full_labels(mod),
)
gitea = FakeGitea(
repo={"name": params["REPO"], "empty": False},
hooks=[{"id": 7, "active": True, "config": {"url": _WEBHOOK_URL}}],
)
git_calls = []
report = mod.run_apply(
params, plane, gitea, webhook_url=_WEBHOOK_URL,
git_runner=lambda cmd, cwd: git_calls.append((cmd, cwd)) or 0,
workdir=str(tmp_path),
)
assert plane.mutations == [], "idempotent apply must not re-create Plane entities"
assert gitea.mutations == [], "idempotent apply must not re-create Gitea entities"
assert git_calls == [], "apply must NEVER push into a non-empty existing repo"
assert _step(report, "plane.project").status == mod.SKIPPED
for name in mod.STATE_GROUPS:
assert _step(report, f"plane.state:{name}").status == mod.SKIPPED
for label in mod.label_names():
assert _step(report, f"plane.label:{label}").status == mod.SKIPPED
assert _step(report, "gitea.repo").status == mod.SKIPPED
assert _step(report, "gitea.webhook").status == mod.SKIPPED
assert _step(report, "kit.push").status == mod.MANUAL, (
"non-empty repo -> kit push degrades to a manual step, never an overwrite"
)
summary = report.to_dict()
for key in ("created", "skipped", "manual"):
assert key in summary["totals"], f"report totals lack {key!r}"
# --------------------------------------------------------------------------- #
# TC-18 — apply runs no restarts / no prod-.env edits / git only (NFR-2)
# --------------------------------------------------------------------------- #
def test_tc18_fresh_apply_runs_git_only_inside_workdir(mod, tmp_path):
params = _params()
plane = FakePlane(mod)
gitea = FakeGitea()
calls = []
def recorder(cmd, cwd):
calls.append((list(cmd), cwd))
return 0
report = mod.run_apply(
params, plane, gitea, webhook_url=_WEBHOOK_URL,
git_runner=recorder, workdir=str(tmp_path),
)
assert calls, "fresh empty repo: the initial push pipeline must run"
for cmd, cwd in calls:
assert cmd[0] == "git", f"only git may be executed, got: {cmd}"
assert cwd and str(tmp_path) in cwd, (
f"git must run only inside the materialisation workdir, got cwd={cwd}"
)
joined = " ".join(" ".join(c) for c, _ in calls)
assert "docker" not in joined and "restart" not in joined
assert _step(report, "kit.push").status == mod.CREATED
assert ("create_repo", "admin", "demo-project") in gitea.mutations
hook_calls = [m for m in gitea.mutations if m[0] == "create_hook"]
assert hook_calls and hook_calls[0][1] == _WEBHOOK_URL
assert set(hook_calls[0][2]) == {"push", "pull_request", "status"}
def test_tc18_source_has_no_container_or_env_mutation_ops(mod):
with open(_SCRIPT_PATH, encoding="utf-8") as f:
source = f.read()
lowered = source.lower()
assert "docker" not in lowered, "the script must not touch containers (NFR-2)"
assert "systemctl" not in lowered
assert "compose" not in lowered
assert re.search(r"open\([^)]*\.env[^)]*['\"][wa]", source) is None, (
"the script must never WRITE any .env (read-only access allowed)"
)
# --------------------------------------------------------------------------- #
# verify — registry / states / labels / webhook / kit completeness (FR-5)
# --------------------------------------------------------------------------- #
def _verify_files(mod):
params = _params()
rendered = mod.render_kit_in_memory(params)
files = dict(rendered)
for i in range(16):
files[f"docs/_templates/{i:02d}-skeleton.md"] = "x"
for name in ("PIPELINE_DOCS.md", "HANDOFF_PROTOCOL.md", "TRACEABILITY.md"):
files[f"docs/_standards/{name}"] = "x"
return files
def test_verify_all_green(mod):
params = _params()
plane = FakePlane(
mod,
project={"id": params["PLANE_PROJECT_ID"], "identifier": "DEMO"},
states=_full_states(mod),
labels=_full_labels(mod),
)
gitea = FakeGitea(
repo={"name": params["REPO"], "empty": False},
hooks=[{"id": 1, "active": True, "config": {"url": _WEBHOOK_URL}}],
files=_verify_files(mod),
)
entry = mod.build_registry_entry(params)
_, merged = mod.merged_projects_json(entry, "[]")
report = mod.run_verify(
params, plane, gitea, webhook_url=_WEBHOOK_URL, projects_raw=merged,
)
gaps = [s for s in report.steps if s.status == mod.GAP]
assert not gaps, f"verify reported gaps on a fully onboarded project: {gaps}"
def test_verify_flags_missing_failclosed_statuses(mod):
params = _params()
states = [s for s in _full_states(mod) if s["name"] not in ("STOP", "Confirm Deploy")]
plane = FakePlane(
mod,
project={"id": params["PLANE_PROJECT_ID"], "identifier": "DEMO"},
states=states,
labels=_full_labels(mod),
)
gitea = FakeGitea(
repo={"name": params["REPO"], "empty": False},
hooks=[{"id": 1, "active": True, "config": {"url": _WEBHOOK_URL}}],
files=_verify_files(mod),
)
entry = mod.build_registry_entry(params)
_, merged = mod.merged_projects_json(entry, "[]")
report = mod.run_verify(
params, plane, gitea, webhook_url=_WEBHOOK_URL, projects_raw=merged,
)
states_step = _step(report, "verify.plane.states")
assert states_step.status == mod.GAP
assert "STOP" in states_step.detail and "Confirm Deploy" in states_step.detail, (
"verify must name the missing fail-closed statuses explicitly"
)
assert report.exit_code == 2