developer(ET): auto-commit from developer run_id=587
This commit is contained in:
605
tests/test_onboarding_script.py
Normal file
605
tests/test_onboarding_script.py
Normal file
@@ -0,0 +1,605 @@
|
||||
"""ORCH-009: tests of the operator onboarding CLI (`scripts/onboard_project.py`).
|
||||
|
||||
Covers test-plan TC-02 (live-copy of the canon), TC-09..TC-11 (render /
|
||||
anti-leak / referential integrity), TC-12 (registry round-trip through the
|
||||
actual parser), TC-13..TC-16 (plan: Plane/Gitea completeness + dry-run with
|
||||
zero mutations), TC-17..TC-18 (idempotent & safe apply).
|
||||
|
||||
All tests are deterministic and offline (NFR-5): the Plane/Gitea clients are
|
||||
replaced with in-memory fakes; git is replaced with a recording runner. The
|
||||
script module is loaded via importlib (pattern: tests/test_staging_check_b6.py).
|
||||
"""
|
||||
import importlib.util
|
||||
import json
|
||||
import os
|
||||
import re
|
||||
|
||||
import pytest
|
||||
|
||||
_REPO_ROOT = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
|
||||
_SCRIPT_PATH = os.path.join(_REPO_ROOT, "scripts", "onboard_project.py")
|
||||
|
||||
|
||||
def _load_module():
|
||||
spec = importlib.util.spec_from_file_location("onboard_project", _SCRIPT_PATH)
|
||||
module = importlib.util.module_from_spec(spec)
|
||||
spec.loader.exec_module(module)
|
||||
return module
|
||||
|
||||
|
||||
@pytest.fixture(scope="module")
|
||||
def mod():
|
||||
return _load_module()
|
||||
|
||||
|
||||
_WEBHOOK_URL = "https://orchestrator.example.org/webhook/gitea"
|
||||
|
||||
|
||||
def _params(**over):
|
||||
p = {
|
||||
"PROJECT_NAME": "Demo Project",
|
||||
"PROJECT_DESCRIPTION": "Демо-проект для проверки онбординга",
|
||||
"REPO": "demo-project",
|
||||
"GITEA_OWNER": "admin",
|
||||
"WORK_ITEM_PREFIX": "DEMO",
|
||||
"PLANE_PROJECT_ID": "11111111-2222-3333-4444-555555555555",
|
||||
"STACK": "Python 3.12 + FastAPI + SQLite",
|
||||
"TEST_CMD": "pytest tests/ -q",
|
||||
"PROD_PORT": "8600",
|
||||
"STAGING_PORT": "8601",
|
||||
}
|
||||
p.update(over)
|
||||
return p
|
||||
|
||||
|
||||
def _step(report, step_id):
|
||||
matches = [s for s in report.steps if s.id == step_id]
|
||||
assert matches, f"report has no step {step_id!r}: {[s.id for s in report.steps]}"
|
||||
return matches[0]
|
||||
|
||||
|
||||
# --------------------------------------------------------------------------- #
|
||||
# Fakes — the only network touchpoints of the script, replaced in-memory.
|
||||
# --------------------------------------------------------------------------- #
|
||||
|
||||
class FakePlane:
|
||||
def __init__(self, mod, project=None, states=None, labels=None,
|
||||
refuse_create_states=False, refuse_create_labels=False,
|
||||
refuse_create_project=False):
|
||||
self._mod = mod
|
||||
self.project = project
|
||||
self.states = list(states or [])
|
||||
self.labels = list(labels or [])
|
||||
self.mutations = []
|
||||
self.refuse_create_states = refuse_create_states
|
||||
self.refuse_create_labels = refuse_create_labels
|
||||
self.refuse_create_project = refuse_create_project
|
||||
|
||||
# GET probes
|
||||
def get_project(self, project_id):
|
||||
if self.project and self.project.get("id") == project_id:
|
||||
return self.project
|
||||
return None
|
||||
|
||||
def find_project_by_identifier(self, identifier):
|
||||
if self.project and self.project.get("identifier") == identifier:
|
||||
return self.project
|
||||
return None
|
||||
|
||||
def list_states(self, project_id):
|
||||
return list(self.states)
|
||||
|
||||
def list_labels(self, project_id):
|
||||
return list(self.labels)
|
||||
|
||||
# mutations
|
||||
def create_project(self, name, identifier):
|
||||
if self.refuse_create_project:
|
||||
raise self._mod.ManualStep("Plane CE: projects API not available")
|
||||
self.mutations.append(("create_project", name, identifier))
|
||||
self.project = {"id": "plane-uuid-created", "name": name, "identifier": identifier}
|
||||
return self.project
|
||||
|
||||
def create_state(self, project_id, name, group):
|
||||
if self.refuse_create_states:
|
||||
raise self._mod.ManualStep("Plane CE: states API not available")
|
||||
self.mutations.append(("create_state", name, group))
|
||||
state = {"id": f"uuid-{name}", "name": name, "group": group}
|
||||
self.states.append(state)
|
||||
return state
|
||||
|
||||
def create_label(self, project_id, name):
|
||||
if self.refuse_create_labels:
|
||||
raise self._mod.ManualStep("Plane CE: labels API not available")
|
||||
self.mutations.append(("create_label", name))
|
||||
label = {"id": f"uuid-{name}", "name": name}
|
||||
self.labels.append(label)
|
||||
return label
|
||||
|
||||
|
||||
class FakeGitea:
|
||||
def __init__(self, repo=None, hooks=None, files=None):
|
||||
self.repo = repo
|
||||
self.hooks = list(hooks or [])
|
||||
self.files = dict(files or {}) # repo path -> text (for verify)
|
||||
self.mutations = []
|
||||
|
||||
def get_repo(self, owner, repo):
|
||||
return self.repo
|
||||
|
||||
def list_hooks(self, owner, repo):
|
||||
return list(self.hooks)
|
||||
|
||||
def create_repo(self, owner, name, description=""):
|
||||
self.mutations.append(("create_repo", owner, name))
|
||||
self.repo = {"name": name, "owner": {"login": owner}, "empty": True}
|
||||
return self.repo
|
||||
|
||||
def create_hook(self, owner, repo, url, secret, events):
|
||||
self.mutations.append(("create_hook", url, tuple(events)))
|
||||
hook = {"id": 1, "active": True, "config": {"url": url}, "events": list(events)}
|
||||
self.hooks.append(hook)
|
||||
return hook
|
||||
|
||||
# verify helpers
|
||||
def get_file_text(self, owner, repo, path):
|
||||
return self.files.get(path)
|
||||
|
||||
def list_dir(self, owner, repo, path):
|
||||
prefix = path.rstrip("/") + "/"
|
||||
names = {
|
||||
rel[len(prefix):].split("/", 1)[0]
|
||||
for rel in self.files
|
||||
if rel.startswith(prefix)
|
||||
}
|
||||
return sorted(names) or None
|
||||
|
||||
|
||||
def _full_states(mod):
|
||||
return [
|
||||
{"id": f"uuid-{name}", "name": name, "group": group}
|
||||
for name, group in mod.STATE_GROUPS.items()
|
||||
]
|
||||
|
||||
|
||||
def _full_labels(mod):
|
||||
return [{"id": f"uuid-{name}", "name": name} for name in mod.label_names()]
|
||||
|
||||
|
||||
# --------------------------------------------------------------------------- #
|
||||
# TC-02 — materialisation live-copies the canon (BR-2 / D3)
|
||||
# --------------------------------------------------------------------------- #
|
||||
|
||||
def test_tc02_materialise_live_copies_canon(mod, tmp_path):
|
||||
dest = tmp_path / "repo"
|
||||
written = mod.materialize_kit(_params(), str(dest))
|
||||
assert written, "materialize_kit wrote nothing"
|
||||
|
||||
templates = os.listdir(dest / "docs" / "_templates")
|
||||
standards = os.listdir(dest / "docs" / "_standards")
|
||||
assert len(templates) >= 16, f"expected >=16 canonical skeletons, got {len(templates)}"
|
||||
assert len(standards) >= 3, f"expected >=3 standards, got {len(standards)}"
|
||||
|
||||
# Verbatim copy — byte-equal to the live canon of the orchestrator checkout.
|
||||
for rel in ("PIPELINE_DOCS.md", "HANDOFF_PROTOCOL.md", "TRACEABILITY.md"):
|
||||
src_path = os.path.join(_REPO_ROOT, "docs", "_standards", rel)
|
||||
with open(src_path, encoding="utf-8") as f:
|
||||
canon = f.read()
|
||||
with open(dest / "docs" / "_standards" / rel, encoding="utf-8") as f:
|
||||
copied = f.read()
|
||||
assert copied == canon, f"{rel} must be live-copied verbatim (BR-2)"
|
||||
|
||||
|
||||
# --------------------------------------------------------------------------- #
|
||||
# TC-09 / TC-10 — render: no unresolved placeholders, no orc leaks (AC-5)
|
||||
# --------------------------------------------------------------------------- #
|
||||
|
||||
def test_tc09_render_resolves_all_placeholders(mod):
|
||||
rendered = mod.render_kit_in_memory(_params())
|
||||
assert rendered, "render_kit_in_memory returned nothing"
|
||||
for rel, content in rendered.items():
|
||||
unresolved = mod.find_unresolved(content)
|
||||
assert not unresolved, f"{rel} keeps unresolved placeholders: {unresolved}"
|
||||
|
||||
|
||||
def test_tc10_no_orchestrator_specific_leaks(mod):
|
||||
rendered = mod.render_kit_in_memory(_params())
|
||||
joined = "\n".join(rendered.values())
|
||||
assert re.search(r"ORCH-\d", joined) is None, (
|
||||
"rendered kit leaks an ORCH-NNN work-item literal where the project "
|
||||
"prefix belongs (TC-10)"
|
||||
)
|
||||
assert "8500" not in joined and "8501" not in joined, (
|
||||
"rendered kit leaks the orchestrator prod/staging ports"
|
||||
)
|
||||
assert "self-hosting" not in joined.lower(), (
|
||||
"rendered kit leaks the orchestrator self-hosting rules"
|
||||
)
|
||||
# The project's own parameters actually got substituted.
|
||||
assert "DEMO-" in joined, "the project's work-item prefix was not substituted"
|
||||
assert "demo-project" in joined, "the repo name was not substituted"
|
||||
assert "8600" in joined and "8601" in joined, "ports were not substituted"
|
||||
|
||||
|
||||
def test_render_is_a_pure_replace(mod):
|
||||
text = "prefix {{WORK_ITEM_PREFIX}}-12 on port {{PROD_PORT}}"
|
||||
out = mod.render(text, {"WORK_ITEM_PREFIX": "AB", "PROD_PORT": "9000"})
|
||||
assert out == "prefix AB-12 on port 9000"
|
||||
assert mod.find_unresolved("a {{LEFT_OVER}} b") == ["{{LEFT_OVER}}"]
|
||||
|
||||
|
||||
# --------------------------------------------------------------------------- #
|
||||
# TC-11 — referential integrity of rendered prompts/AGENTS.md (AC-5)
|
||||
# --------------------------------------------------------------------------- #
|
||||
|
||||
_PATH_TOKEN = re.compile(
|
||||
r"(?:docs/[\w./\-]+|\.openclaw/agents/[\w.\-]+|CLAUDE\.md|AGENTS\.md|"
|
||||
r"CONTRIBUTING\.md|README\.md|CHANGELOG\.md|\.env\.example)"
|
||||
)
|
||||
|
||||
|
||||
def _static_paths(text: str) -> set[str]:
|
||||
out = set()
|
||||
for token in _PATH_TOKEN.findall(text):
|
||||
token = token.rstrip(".,;:)`'\"")
|
||||
# dynamic/illustrative tokens are not checkable paths
|
||||
if any(ch in token for ch in "<>*{}") or "NNN" in token:
|
||||
continue
|
||||
out.add(token)
|
||||
return out
|
||||
|
||||
|
||||
def test_tc11_referenced_paths_exist_in_materialised_tree(mod, tmp_path):
|
||||
dest = tmp_path / "repo"
|
||||
mod.materialize_kit(_params(), str(dest))
|
||||
|
||||
sources = [
|
||||
os.path.join(dest, ".openclaw", "agents", f"{a}.md")
|
||||
for a in ("analyst", "architect", "developer", "reviewer", "tester", "deployer")
|
||||
]
|
||||
sources.append(os.path.join(dest, "AGENTS.md"))
|
||||
|
||||
broken = []
|
||||
for src_file in sources:
|
||||
with open(src_file, encoding="utf-8") as f:
|
||||
for ref in _static_paths(f.read()):
|
||||
target = os.path.join(dest, *ref.split("/"))
|
||||
if not (os.path.isfile(target) or os.path.isdir(target.rstrip("/"))):
|
||||
broken.append((os.path.relpath(src_file, dest), ref))
|
||||
assert not broken, f"kit references non-existent paths: {broken}"
|
||||
|
||||
|
||||
# --------------------------------------------------------------------------- #
|
||||
# TC-12 — registry round-trip through the ACTUAL parser (AC-6)
|
||||
# --------------------------------------------------------------------------- #
|
||||
|
||||
def test_tc12_registry_round_trip_through_actual_parser(mod):
|
||||
from src.projects import _parse_projects_json
|
||||
|
||||
existing = [
|
||||
{
|
||||
"plane_project_id": "7a79f0a9-5278-49cd-9007-9a338f238f9c",
|
||||
"repo": "enduro-trails",
|
||||
"work_item_prefix": "ET",
|
||||
"name": "enduro-trails",
|
||||
},
|
||||
{
|
||||
"plane_project_id": "8da6aa25-a60e-44d6-a1e2-d8ae59aa7d6a",
|
||||
"repo": "orchestrator",
|
||||
"work_item_prefix": "ORCH",
|
||||
"name": "orchestrator",
|
||||
},
|
||||
]
|
||||
params = _params()
|
||||
entry = mod.build_registry_entry(params)
|
||||
standalone, merged = mod.merged_projects_json(entry, json.dumps(existing))
|
||||
|
||||
# standalone entry parses on its own
|
||||
solo = _parse_projects_json(f"[{standalone}]")
|
||||
assert solo is not None and len(solo) == 1
|
||||
|
||||
parsed = _parse_projects_json(merged)
|
||||
assert parsed is not None and len(parsed) == 3, "merged registry must carry all 3"
|
||||
# existing entries survive verbatim (no loss, no distortion)
|
||||
for i, exp in enumerate(existing):
|
||||
assert parsed[i].plane_project_id == exp["plane_project_id"]
|
||||
assert parsed[i].repo == exp["repo"]
|
||||
assert parsed[i].work_item_prefix == exp["work_item_prefix"]
|
||||
assert parsed[i].name == exp["name"]
|
||||
# the new entry carries the source params
|
||||
new = parsed[2]
|
||||
assert new.plane_project_id == params["PLANE_PROJECT_ID"]
|
||||
assert new.repo == params["REPO"]
|
||||
assert new.work_item_prefix == params["WORK_ITEM_PREFIX"]
|
||||
assert new.name == params["PROJECT_NAME"]
|
||||
|
||||
|
||||
def test_tc12_merge_is_idempotent_no_duplicates(mod):
|
||||
from src.projects import _parse_projects_json
|
||||
|
||||
params = _params()
|
||||
entry = mod.build_registry_entry(params)
|
||||
once = json.dumps([entry])
|
||||
_standalone, merged = mod.merged_projects_json(entry, once)
|
||||
parsed = _parse_projects_json(merged)
|
||||
assert parsed is not None and len(parsed) == 1, (
|
||||
"re-merging an already-registered project must not duplicate it"
|
||||
)
|
||||
|
||||
|
||||
# --------------------------------------------------------------------------- #
|
||||
# TC-13 — plan: exact Plane statuses (22) + groups + labels (AC-7)
|
||||
# --------------------------------------------------------------------------- #
|
||||
|
||||
def test_state_groups_match_plane_name_to_key(mod):
|
||||
from src.plane_sync import _PLANE_NAME_TO_KEY
|
||||
|
||||
assert set(mod.STATE_GROUPS) == set(_PLANE_NAME_TO_KEY), (
|
||||
"STATE_GROUPS must cover exactly the canonical Plane status names"
|
||||
)
|
||||
# Code-critical constraints (ADR-001 D5): STOP joins the cancelled group
|
||||
# (ORCH-090 fail-closed cancel); only Done/Cancelled/STOP are terminal —
|
||||
# otherwise terminal-detection (ORCH-068) falsely terminates live tasks.
|
||||
assert mod.STATE_GROUPS["STOP"] == "cancelled"
|
||||
assert mod.STATE_GROUPS["Done"] == "completed"
|
||||
assert mod.STATE_GROUPS["Cancelled"] == "cancelled"
|
||||
terminal = {n for n, g in mod.STATE_GROUPS.items() if g in ("completed", "cancelled")}
|
||||
assert terminal == {"Done", "Cancelled", "STOP"}
|
||||
|
||||
|
||||
def test_tc13_plan_covers_all_statuses_and_labels(mod):
|
||||
from src.plane_sync import _PLANE_NAME_TO_KEY
|
||||
|
||||
plane = FakePlane(mod)
|
||||
gitea = FakeGitea()
|
||||
report = mod.run_plan(_params(), plane, gitea, webhook_url=_WEBHOOK_URL)
|
||||
|
||||
for name in _PLANE_NAME_TO_KEY:
|
||||
step = _step(report, f"plane.state:{name}")
|
||||
assert step.status == mod.PLANNED, f"status {name!r} not planned: {step.status}"
|
||||
stop = _step(report, "plane.state:STOP")
|
||||
assert "cancelled" in stop.detail, "STOP step must pin the cancelled group"
|
||||
|
||||
for label in mod.label_names():
|
||||
assert _step(report, f"plane.label:{label}").status == mod.PLANNED
|
||||
assert set(mod.label_names()) == {"autoApprove", "autoDeploy", "Bug"}
|
||||
|
||||
# known UI-only steps are flagged manual, never silently dropped (D5)
|
||||
assert _step(report, "plane.board-order").status == mod.MANUAL
|
||||
assert _step(report, "plane.workspace-webhook").status == mod.MANUAL
|
||||
|
||||
|
||||
# --------------------------------------------------------------------------- #
|
||||
# TC-14 — Plane API refusal degrades to manual-step, never a crash (AC-7)
|
||||
# --------------------------------------------------------------------------- #
|
||||
|
||||
def test_tc14_plane_refusal_becomes_manual_step(mod, tmp_path):
|
||||
plane = FakePlane(
|
||||
mod,
|
||||
project={"id": _params()["PLANE_PROJECT_ID"], "identifier": "DEMO"},
|
||||
refuse_create_states=True,
|
||||
refuse_create_labels=True,
|
||||
)
|
||||
gitea = FakeGitea(
|
||||
repo={"name": "demo-project", "empty": False},
|
||||
hooks=[{"id": 1, "active": True, "config": {"url": _WEBHOOK_URL}}],
|
||||
)
|
||||
report = mod.run_apply(
|
||||
_params(), plane, gitea,
|
||||
webhook_url=_WEBHOOK_URL, git_runner=lambda cmd, cwd: 0,
|
||||
workdir=str(tmp_path),
|
||||
)
|
||||
state_steps = [s for s in report.steps if s.id.startswith("plane.state:")]
|
||||
assert state_steps and all(s.status == mod.MANUAL for s in state_steps), (
|
||||
"refused Plane state creation must degrade to manual-step"
|
||||
)
|
||||
for s in state_steps:
|
||||
assert "ONBOARDING.md" in s.detail, "manual-step must link the runbook"
|
||||
label_steps = [s for s in report.steps if s.id.startswith("plane.label:")]
|
||||
assert label_steps and all(s.status == mod.MANUAL for s in label_steps)
|
||||
assert report.exit_code == 2, "manual steps -> exit code 2"
|
||||
|
||||
|
||||
# --------------------------------------------------------------------------- #
|
||||
# TC-15 / TC-16 — plan: Gitea layer complete; dry-run mutates NOTHING (AC-8)
|
||||
# --------------------------------------------------------------------------- #
|
||||
|
||||
def test_tc15_plan_contains_gitea_repo_webhook_and_push(mod):
|
||||
plane = FakePlane(mod)
|
||||
gitea = FakeGitea()
|
||||
report = mod.run_plan(_params(), plane, gitea, webhook_url=_WEBHOOK_URL)
|
||||
|
||||
assert _step(report, "gitea.repo").status == mod.PLANNED
|
||||
hook = _step(report, "gitea.webhook")
|
||||
assert hook.status == mod.PLANNED
|
||||
for event in ("push", "pull_request", "status"):
|
||||
assert event in hook.detail, f"webhook plan must name event {event!r}"
|
||||
assert "HMAC" in hook.detail or "secret" in hook.detail.lower(), (
|
||||
"webhook plan must mention the HMAC secret (kept out of git)"
|
||||
)
|
||||
push = _step(report, "kit.push")
|
||||
assert push.status == mod.PLANNED
|
||||
assert "push" in push.detail.lower()
|
||||
|
||||
|
||||
def test_tc16_plan_is_a_pure_dry_run(mod, monkeypatch):
|
||||
plane = FakePlane(mod)
|
||||
gitea = FakeGitea()
|
||||
|
||||
def _boom(*_a, **_kw): # plan must never materialise or push
|
||||
raise AssertionError("plan mode touched the disk / git")
|
||||
|
||||
monkeypatch.setattr(mod, "materialize_kit", _boom)
|
||||
monkeypatch.setattr(mod, "initial_push", _boom)
|
||||
|
||||
report = mod.run_plan(_params(), plane, gitea, webhook_url=_WEBHOOK_URL)
|
||||
assert plane.mutations == [], "plan made a Plane mutation"
|
||||
assert gitea.mutations == [], "plan made a Gitea mutation"
|
||||
assert report.steps, "plan produced an empty report"
|
||||
|
||||
|
||||
def test_secret_never_leaks_into_report(mod):
|
||||
plane = FakePlane(mod)
|
||||
gitea = FakeGitea()
|
||||
report = mod.run_plan(
|
||||
_params(), plane, gitea, webhook_url=_WEBHOOK_URL,
|
||||
webhook_secret="super-secret-hmac-value",
|
||||
)
|
||||
dumped = json.dumps(report.to_dict(), ensure_ascii=False)
|
||||
assert "super-secret-hmac-value" not in dumped, (
|
||||
"the webhook HMAC secret leaked into the report (NFR-3)"
|
||||
)
|
||||
|
||||
|
||||
# --------------------------------------------------------------------------- #
|
||||
# TC-17 — apply is idempotent: existing entities -> skipped(exists) (AC-9)
|
||||
# --------------------------------------------------------------------------- #
|
||||
|
||||
def test_tc17_second_apply_skips_everything_existing(mod, tmp_path):
|
||||
params = _params()
|
||||
plane = FakePlane(
|
||||
mod,
|
||||
project={"id": params["PLANE_PROJECT_ID"], "identifier": "DEMO"},
|
||||
states=_full_states(mod),
|
||||
labels=_full_labels(mod),
|
||||
)
|
||||
gitea = FakeGitea(
|
||||
repo={"name": params["REPO"], "empty": False},
|
||||
hooks=[{"id": 7, "active": True, "config": {"url": _WEBHOOK_URL}}],
|
||||
)
|
||||
git_calls = []
|
||||
report = mod.run_apply(
|
||||
params, plane, gitea, webhook_url=_WEBHOOK_URL,
|
||||
git_runner=lambda cmd, cwd: git_calls.append((cmd, cwd)) or 0,
|
||||
workdir=str(tmp_path),
|
||||
)
|
||||
|
||||
assert plane.mutations == [], "idempotent apply must not re-create Plane entities"
|
||||
assert gitea.mutations == [], "idempotent apply must not re-create Gitea entities"
|
||||
assert git_calls == [], "apply must NEVER push into a non-empty existing repo"
|
||||
|
||||
assert _step(report, "plane.project").status == mod.SKIPPED
|
||||
for name in mod.STATE_GROUPS:
|
||||
assert _step(report, f"plane.state:{name}").status == mod.SKIPPED
|
||||
for label in mod.label_names():
|
||||
assert _step(report, f"plane.label:{label}").status == mod.SKIPPED
|
||||
assert _step(report, "gitea.repo").status == mod.SKIPPED
|
||||
assert _step(report, "gitea.webhook").status == mod.SKIPPED
|
||||
assert _step(report, "kit.push").status == mod.MANUAL, (
|
||||
"non-empty repo -> kit push degrades to a manual step, never an overwrite"
|
||||
)
|
||||
|
||||
summary = report.to_dict()
|
||||
for key in ("created", "skipped", "manual"):
|
||||
assert key in summary["totals"], f"report totals lack {key!r}"
|
||||
|
||||
|
||||
# --------------------------------------------------------------------------- #
|
||||
# TC-18 — apply runs no restarts / no prod-.env edits / git only (NFR-2)
|
||||
# --------------------------------------------------------------------------- #
|
||||
|
||||
def test_tc18_fresh_apply_runs_git_only_inside_workdir(mod, tmp_path):
|
||||
params = _params()
|
||||
plane = FakePlane(mod)
|
||||
gitea = FakeGitea()
|
||||
calls = []
|
||||
|
||||
def recorder(cmd, cwd):
|
||||
calls.append((list(cmd), cwd))
|
||||
return 0
|
||||
|
||||
report = mod.run_apply(
|
||||
params, plane, gitea, webhook_url=_WEBHOOK_URL,
|
||||
git_runner=recorder, workdir=str(tmp_path),
|
||||
)
|
||||
|
||||
assert calls, "fresh empty repo: the initial push pipeline must run"
|
||||
for cmd, cwd in calls:
|
||||
assert cmd[0] == "git", f"only git may be executed, got: {cmd}"
|
||||
assert cwd and str(tmp_path) in cwd, (
|
||||
f"git must run only inside the materialisation workdir, got cwd={cwd}"
|
||||
)
|
||||
joined = " ".join(" ".join(c) for c, _ in calls)
|
||||
assert "docker" not in joined and "restart" not in joined
|
||||
|
||||
assert _step(report, "kit.push").status == mod.CREATED
|
||||
assert ("create_repo", "admin", "demo-project") in gitea.mutations
|
||||
hook_calls = [m for m in gitea.mutations if m[0] == "create_hook"]
|
||||
assert hook_calls and hook_calls[0][1] == _WEBHOOK_URL
|
||||
assert set(hook_calls[0][2]) == {"push", "pull_request", "status"}
|
||||
|
||||
|
||||
def test_tc18_source_has_no_container_or_env_mutation_ops(mod):
|
||||
with open(_SCRIPT_PATH, encoding="utf-8") as f:
|
||||
source = f.read()
|
||||
lowered = source.lower()
|
||||
assert "docker" not in lowered, "the script must not touch containers (NFR-2)"
|
||||
assert "systemctl" not in lowered
|
||||
assert "compose" not in lowered
|
||||
assert re.search(r"open\([^)]*\.env[^)]*['\"][wa]", source) is None, (
|
||||
"the script must never WRITE any .env (read-only access allowed)"
|
||||
)
|
||||
|
||||
|
||||
# --------------------------------------------------------------------------- #
|
||||
# verify — registry / states / labels / webhook / kit completeness (FR-5)
|
||||
# --------------------------------------------------------------------------- #
|
||||
|
||||
def _verify_files(mod):
|
||||
params = _params()
|
||||
rendered = mod.render_kit_in_memory(params)
|
||||
files = dict(rendered)
|
||||
for i in range(16):
|
||||
files[f"docs/_templates/{i:02d}-skeleton.md"] = "x"
|
||||
for name in ("PIPELINE_DOCS.md", "HANDOFF_PROTOCOL.md", "TRACEABILITY.md"):
|
||||
files[f"docs/_standards/{name}"] = "x"
|
||||
return files
|
||||
|
||||
|
||||
def test_verify_all_green(mod):
|
||||
params = _params()
|
||||
plane = FakePlane(
|
||||
mod,
|
||||
project={"id": params["PLANE_PROJECT_ID"], "identifier": "DEMO"},
|
||||
states=_full_states(mod),
|
||||
labels=_full_labels(mod),
|
||||
)
|
||||
gitea = FakeGitea(
|
||||
repo={"name": params["REPO"], "empty": False},
|
||||
hooks=[{"id": 1, "active": True, "config": {"url": _WEBHOOK_URL}}],
|
||||
files=_verify_files(mod),
|
||||
)
|
||||
entry = mod.build_registry_entry(params)
|
||||
_, merged = mod.merged_projects_json(entry, "[]")
|
||||
report = mod.run_verify(
|
||||
params, plane, gitea, webhook_url=_WEBHOOK_URL, projects_raw=merged,
|
||||
)
|
||||
gaps = [s for s in report.steps if s.status == mod.GAP]
|
||||
assert not gaps, f"verify reported gaps on a fully onboarded project: {gaps}"
|
||||
|
||||
|
||||
def test_verify_flags_missing_failclosed_statuses(mod):
|
||||
params = _params()
|
||||
states = [s for s in _full_states(mod) if s["name"] not in ("STOP", "Confirm Deploy")]
|
||||
plane = FakePlane(
|
||||
mod,
|
||||
project={"id": params["PLANE_PROJECT_ID"], "identifier": "DEMO"},
|
||||
states=states,
|
||||
labels=_full_labels(mod),
|
||||
)
|
||||
gitea = FakeGitea(
|
||||
repo={"name": params["REPO"], "empty": False},
|
||||
hooks=[{"id": 1, "active": True, "config": {"url": _WEBHOOK_URL}}],
|
||||
files=_verify_files(mod),
|
||||
)
|
||||
entry = mod.build_registry_entry(params)
|
||||
_, merged = mod.merged_projects_json(entry, "[]")
|
||||
report = mod.run_verify(
|
||||
params, plane, gitea, webhook_url=_WEBHOOK_URL, projects_raw=merged,
|
||||
)
|
||||
states_step = _step(report, "verify.plane.states")
|
||||
assert states_step.status == mod.GAP
|
||||
assert "STOP" in states_step.detail and "Confirm Deploy" in states_step.detail, (
|
||||
"verify must name the missing fail-closed statuses explicitly"
|
||||
)
|
||||
assert report.exit_code == 2
|
||||
Reference in New Issue
Block a user