feat(pipeline): add deploy-staging gate before prod deploy (ORCH-35) #31

Merged
admin merged 2 commits from feature/ORCH-35-staging-gate into main 2026-06-05 10:43:38 +03:00
6 changed files with 520 additions and 4 deletions

View File

@@ -0,0 +1,67 @@
# Deployer Agent
You are the **Deployer** agent in the orchestrator pipeline. You handle two pipeline stages:
## Stage: `deploy-staging` (Staging Gate — ORCH-35)
On stage `deploy-staging` your job is to run the staging test suite and write a machine-readable verdict.
### Steps:
1. Run the staging test suite against the live staging environment:
```bash
python3 scripts/staging_check.py --base-url http://localhost:8501 --mode stub
```
2. Check the exit code:
- Exit code **0** = all tests PASS → `staging_status: SUCCESS`
- Exit code **non-zero** = tests FAILED → `staging_status: FAILED`
3. Write the verdict to `docs/work-items/<work_item_id>/15-staging-log.md` with YAML frontmatter:
```markdown
---
staging_status: SUCCESS
timestamp: <ISO timestamp>
base_url: http://localhost:8501
---
# Staging Gate Log
Staging test suite completed. All checks passed.
```
Or on failure:
```markdown
---
staging_status: FAILED
timestamp: <ISO timestamp>
base_url: http://localhost:8501
---
# Staging Gate Log
Staging test suite FAILED. See details below.
<paste test output here>
```
4. Merge `15-staging-log.md` into `main` (commit + push, same as deploy log pattern).
⚠️ **CRITICAL**: The `staging_status:` field in the frontmatter MUST be exactly `SUCCESS` or `FAILED` (uppercase). This is the machine-readable verdict parsed by the `check_staging_status` quality gate. No other values are accepted.
---
## Stage: `deploy` (Production Deploy — ORCH-36, future)
On stage `deploy` your job is to perform (or simulate) the production deployment and write a machine-readable verdict to `docs/work-items/<work_item_id>/14-deploy-log.md` with frontmatter field `deploy_status: SUCCESS|FAILED`.
This stage is only reached if the staging gate (`deploy-staging`) passed with `staging_status: SUCCESS`.
⚠️ **CRITICAL**: Do NOT trigger real production deploys unless explicitly instructed. Real docker/SSH deploys are handled by `scripts/orchestrator-deploy-hook.sh` (ORCH-36).
---
## General Rules
- Always write machine-readable YAML frontmatter — the quality gates parse ONLY the frontmatter fields, never the body prose.
- Never push directly to `main`. Always use a PR or the artifact merge pattern.
- Never modify `.env`, `.env.staging`, `docker-compose.yml`, or production infrastructure.

View File

@@ -440,6 +440,130 @@ def check_deploy_status(repo: str, work_item_id: str, branch: str | None = None)
return False, "Deploy log not found (14-deploy-log.md)"
# ---------------------------------------------------------------------------
# Self-hosting detection: staging-infra (localhost:8501) exists ONLY for the
# orchestrator repo itself (self-hosting). Other repos have no staging instance
# and their deployer prompts know nothing about it -- the gate must be a no-op
# for them. The repo value is the plain gitea repo name (ProjectConfig.repo),
# matching what _run_qg/advance_stage pass in. See ORCH-35 / PR #31.
# ---------------------------------------------------------------------------
SELF_HOSTING_REPO = "orchestrator"
def is_self_hosting_repo(repo: str) -> bool:
"""Return True iff repo is the self-hosted orchestrator (has staging infra).
Comparison is case-insensitive and strips whitespace for safety, but in
practice repo comes from the gitea webhook payload .repository.name which
is always lowercase (confirmed via projects.py registry entry).
"""
return (repo or "").strip().lower() == SELF_HOSTING_REPO.lower()
def _parse_staging_status(content: str) -> tuple[bool, str]:
"""Parse a 15-staging-log.md body and map its `staging_status:` frontmatter to a
quality-gate verdict. Reads ONLY the machine-readable YAML field, never prose.
staging_status: SUCCESS -> (True, "Staging status: SUCCESS")
staging_status: FAILED -> (False, "Staging status: FAILED")
missing field / no frontmatter / bad YAML -> (False, <reason>)
"""
import yaml
status = None
if content.startswith("---"):
parts = content.split("---", 2)
if len(parts) >= 3:
try:
fm = yaml.safe_load(parts[1]) or {}
except yaml.YAMLError as e:
return False, f"Invalid YAML frontmatter in staging log: {e}"
status = str(fm.get("staging_status", "")).upper().strip()
if status == "SUCCESS":
return True, "Staging status: SUCCESS"
if status == "FAILED":
return False, "Staging status: FAILED"
return False, f"No machine-readable staging_status in frontmatter (got: {status!r})"
def _staging_log_from_main(repo: str, work_item_id: str) -> str | None:
"""Best-effort read of 15-staging-log.md from origin/main on the shared clone.
The deployer writes 15-staging-log.md and merges the staging artifacts into main
via a separate PR (mirroring the deploy-log pattern), so the file lands in
origin/main, NOT in the feature branch worktree the gate normally reads.
This recovers it from main.
Degrades gracefully: any git failure (no clone, network/fetch error, file
absent in main) returns None instead of raising, so the caller falls back to
the plain "not found" verdict. Never raises.
"""
repo_clone = os.path.join(settings.repos_dir, repo)
if not os.path.isdir(os.path.join(repo_clone, ".git")):
return None
rel = f"docs/work-items/{work_item_id}/15-staging-log.md"
try:
# Refresh origin/main so we see freshly-merged staging artifacts.
subprocess.run(
["git", "-C", repo_clone, "fetch", "origin", "main"],
check=False, capture_output=True, timeout=30,
)
show = subprocess.run(
["git", "-C", repo_clone, "show", f"origin/main:{rel}"],
check=False, capture_output=True, text=True, timeout=15,
)
except (subprocess.SubprocessError, OSError) as e:
logger.warning("staging-log origin/main lookup failed for %s/%s: %s", repo, work_item_id, e)
return None
if show.returncode != 0:
return None
return show.stdout
def check_staging_status(repo: str, work_item_id: str, branch: str | None = None) -> tuple[bool, str]:
"""
Gate the deploy-staging -> deploy transition on the deployer's machine-readable
verdict in 15-staging-log.md frontmatter (staging_status: SUCCESS|FAILED).
ORCH-35 conditional gate (Variant A):
- Non-self-hosting repos (anything other than "orchestrator") have no staging
instance and no deployer knowledge of it -> gate is an immediate pass.
- Self-hosting repo ("orchestrator") -> real check: reads ONLY the machine-
readable staging_status: field from YAML frontmatter, never body prose.
Mirrors check_deploy_status (БАГ 8) for the self-hosting path.
Lookup order (self-hosting only): worktree -> origin/main -> not found.
Returns:
(True, "Staging gate N/A for <repo>") -> non-self-hosting repo (instant pass)
(True, ...) -> staging_status: SUCCESS (self-hosting path)
(False, ...) -> staging_status: FAILED, missing field, or no frontmatter
"""
# Variant A: non-self-hosting repos have no staging infra -- skip entirely.
if not is_self_hosting_repo(repo):
return True, f"Staging gate N/A for {repo}"
# Self-hosting (orchestrator) path: real verdict check.
repo_path = _repo_path(repo, branch)
log_path = os.path.join(repo_path, f"docs/work-items/{work_item_id}/15-staging-log.md")
if os.path.isfile(log_path):
try:
with open(log_path, "r") as f:
content = f.read()
except OSError as e:
return False, f"Error reading staging log: {e}"
return _parse_staging_status(content)
# Not in the feature worktree -- the deployer may have merged it into main.
main_content = _staging_log_from_main(repo, work_item_id)
if main_content is not None:
return _parse_staging_status(main_content)
return False, "Staging log not found (15-staging-log.md)"
# Registry for dynamic lookup by name
QG_CHECKS = {
"check_analysis_approved": check_analysis_approved,
@@ -451,4 +575,5 @@ QG_CHECKS = {
"check_reviewer_verdict": check_reviewer_verdict,
"check_tests_local": check_tests_local,
"check_deploy_status": check_deploy_status,
"check_staging_status": check_staging_status,
}

View File

@@ -517,6 +517,32 @@ def _handle_qg_failure_rollbacks(
f"(job_id={new_job})"
)
# ORCH-35: deployer staging verdict FAILED -> roll deploy-staging back to development.
# Staging-провал = код плох; откат на development по образцу БАГ-8 (deploy->development).
# НЕ трогает ветку check_deploy_status ниже.
if agent == "deployer" and qg_name == "check_staging_status":
update_task_stage(task_id, "development")
notify_stage_change(task_id, current_stage, "development")
plane_notify_stage(work_item_id, current_stage, "development")
result.rolled_back_to = "development"
set_issue_blocked(work_item_id)
notify_qg_failure(task_id, "deploy-staging", "check_staging_status", reason)
plane_add_comment(
work_item_id,
f"\u274c Staging gate FAILED ({reason}). Rolled back to development. "
f"Developer \u043d\u0443\u0436\u0435\u043d \u0434\u043b\u044f \u0444\u0438\u043a\u0441\u0430.",
author="deployer",
)
send_telegram(
f"\U0001f6a8 {work_item_id}: Staging FAILED ({reason}). "
f"Rolled back to development. Needs fix."
)
result.alerted = True
logger.error(
f"Task {task_id}: deployer staging verdict FAILED, rolled back deploy-staging -> "
f"development ({reason})"
)
# БАГ 8: deployer verdict FAILED -> roll deploy back to development.
# The launcher's exit_code-based guard (launcher.py:475) never fires because
# the LLM process exit code is always 0; this gate fires on the machine-readable

View File

@@ -1,7 +1,7 @@
"""Stage machine for orchestrator pipeline.
Stages:
created → analysis → architecture → development → review → testing → deploy → done
created → analysis → architecture → development → review → testing → deploy-staging → deploy → done
Each stage defines:
- next: the stage to advance to
@@ -15,8 +15,9 @@ STAGE_TRANSITIONS = {
"architecture": {"next": "development", "agent": "developer", "qg": "check_architecture_done"},
"development": {"next": "review", "agent": "reviewer", "qg": "check_ci_green"},
"review": {"next": "testing", "agent": "tester", "qg": "check_reviewer_verdict"},
"testing": {"next": "deploy", "agent": "deployer", "qg": "check_tests_passed"},
"deploy": {"next": "done", "agent": None, "qg": "check_deploy_status"},
"testing": {"next": "deploy-staging", "agent": "deployer", "qg": "check_tests_passed"},
"deploy-staging": {"next": "deploy", "agent": "deployer", "qg": "check_staging_status"},
"deploy": {"next": "done", "agent": None, "qg": "check_deploy_status"},
"done": {"next": None, "agent": None, "qg": None},
}

View File

@@ -19,6 +19,7 @@ from src.qg.checks import (
check_tests_passed,
check_tests_local,
check_deploy_status,
check_staging_status,
)
from src.stages import get_qg_for_stage
@@ -448,3 +449,185 @@ class TestCheckTestsLocal:
assert "../../tests/" in cmd
assert kwargs["cwd"] == os.path.join(str(tmp_path), "src", "api")
class TestCheckStagingStatus:
"""ORCH-35 conditional gate (Variant A): deploy-staging gate is active ONLY for
the self-hosting orchestrator repo (has staging infra on localhost:8501). All
other repos pass immediately with "Staging gate N/A for <repo>".
Self-hosting path: reads machine-readable staging_status: from 15-staging-log.md
frontmatter. Mirrors check_deploy_status pattern.
"""
@pytest.fixture()
def orch_dir(self, tmp_path, monkeypatch):
"""Temp orchestrator repo dir (self-hosting)."""
monkeypatch.setattr("src.qg.checks.settings.repos_dir", str(tmp_path))
d = tmp_path / "orchestrator"
d.mkdir(exist_ok=True)
return d
def _write_log(self, repo_dir, content, wi="ORCH-035"):
wi_dir = repo_dir / "docs" / "work-items" / wi
wi_dir.mkdir(parents=True, exist_ok=True)
(wi_dir / "15-staging-log.md").write_text(content)
# ------------------------------------------------------------------
# Self-hosting (orchestrator) path -- real file check
# ------------------------------------------------------------------
def test_success_verdict_passes(self, orch_dir):
self._write_log(
orch_dir,
"---\nstaging_status: SUCCESS\ntimestamp: 2026-06-05T00:00:00Z\n---\n\nAll staging tests passed.\n",
)
from src.qg.checks import check_staging_status
passed, reason = check_staging_status("orchestrator", "ORCH-035")
assert passed is True
assert "SUCCESS" in reason
def test_failed_verdict_fails(self, orch_dir):
self._write_log(
orch_dir,
"---\nstaging_status: FAILED\ntimestamp: 2026-06-05T00:00:00Z\n---\n\n2 tests failed.\n",
)
from src.qg.checks import check_staging_status
passed, reason = check_staging_status("orchestrator", "ORCH-035")
assert passed is False
assert "FAILED" in reason
def test_no_file_fails_for_self_hosting(self, orch_dir):
from src.qg.checks import check_staging_status
passed, reason = check_staging_status("orchestrator", "ORCH-035")
assert passed is False
assert "not found" in reason.lower()
def test_no_field_fails(self, orch_dir):
# Frontmatter present but no staging_status field -> must NOT pass.
self._write_log(
orch_dir,
"---\nversion: v0.0.3\n---\n\nStatus: all good (prose only).\n",
)
from src.qg.checks import check_staging_status
passed, reason = check_staging_status("orchestrator", "ORCH-035")
assert passed is False
def test_prose_only_no_frontmatter_fails(self, orch_dir):
# Prose mentioning SUCCESS but no machine-readable frontmatter -> fail.
self._write_log(
orch_dir,
"# Staging Log\n\nStatus: SUCCESS (prose, not frontmatter).\n",
)
from src.qg.checks import check_staging_status
passed, reason = check_staging_status("orchestrator", "ORCH-035")
assert passed is False
def test_origin_main_success_passes_when_absent_in_worktree(self, monkeypatch):
# Deployer merged 15-staging-log.md into main; not in worktree -> recover from main.
monkeypatch.setattr(
"src.qg.checks._staging_log_from_main",
lambda repo, wi: "---\nstaging_status: SUCCESS\n---\n\nAll good.\n",
)
from src.qg.checks import check_staging_status
passed, reason = check_staging_status("orchestrator", "ORCH-035-main")
assert passed is True
assert "SUCCESS" in reason
def test_origin_main_failed_fails(self, monkeypatch):
monkeypatch.setattr(
"src.qg.checks._staging_log_from_main",
lambda repo, wi: "---\nstaging_status: FAILED\n---\n\nboom.\n",
)
from src.qg.checks import check_staging_status
passed, reason = check_staging_status("orchestrator", "ORCH-035-main")
assert passed is False
assert "FAILED" in reason
def test_absent_everywhere_fails(self, monkeypatch):
monkeypatch.setattr(
"src.qg.checks._staging_log_from_main", lambda repo, wi: None
)
from src.qg.checks import check_staging_status
passed, reason = check_staging_status("orchestrator", "ORCH-035-absent")
assert passed is False
assert "not found" in reason.lower()
# ------------------------------------------------------------------
# Non-self-hosting path -- instant pass, no file dependency
# ------------------------------------------------------------------
def test_non_self_hosting_passes_immediately_no_file(self, tmp_path, monkeypatch):
"""Non-self-hosting repo: gate is N/A even without a staging log file."""
monkeypatch.setattr("src.qg.checks.settings.repos_dir", str(tmp_path))
from src.qg.checks import check_staging_status
passed, reason = check_staging_status("enduro-trails", "ET-035")
assert passed is True
assert "N/A" in reason
assert "enduro-trails" in reason
def test_non_self_hosting_passes_regardless_of_file_content(self, tmp_path, monkeypatch):
"""Even a FAILED staging log must not block a non-self-hosting repo."""
monkeypatch.setattr("src.qg.checks.settings.repos_dir", str(tmp_path))
et_dir = tmp_path / "enduro-trails" / "docs" / "work-items" / "ET-035"
et_dir.mkdir(parents=True)
(et_dir / "15-staging-log.md").write_text(
"---\nstaging_status: FAILED\n---\nShould be ignored.\n"
)
from src.qg.checks import check_staging_status
passed, reason = check_staging_status("enduro-trails", "ET-035")
assert passed is True
assert "N/A" in reason
def test_unknown_repo_also_passes_immediately(self, tmp_path, monkeypatch):
"""Any repo that is not orchestrator gets N/A gate."""
monkeypatch.setattr("src.qg.checks.settings.repos_dir", str(tmp_path))
from src.qg.checks import check_staging_status
passed, reason = check_staging_status("some-other-project", "XY-001")
assert passed is True
assert "N/A" in reason
# ------------------------------------------------------------------
# is_self_hosting_repo helper
# ------------------------------------------------------------------
def test_is_self_hosting_true_for_orchestrator(self):
from src.qg.checks import is_self_hosting_repo
assert is_self_hosting_repo("orchestrator") is True
def test_is_self_hosting_case_insensitive(self):
from src.qg.checks import is_self_hosting_repo
assert is_self_hosting_repo("Orchestrator") is True
assert is_self_hosting_repo("ORCHESTRATOR") is True
def test_is_self_hosting_false_for_enduro_trails(self):
from src.qg.checks import is_self_hosting_repo
assert is_self_hosting_repo("enduro-trails") is False
def test_is_self_hosting_false_for_empty(self):
from src.qg.checks import is_self_hosting_repo
assert is_self_hosting_repo("") is False
assert is_self_hosting_repo(None) is False
# ------------------------------------------------------------------
# Stage machinery (regression: must not be broken)
# ------------------------------------------------------------------
def test_deploy_staging_qg_is_check_staging_status(self):
assert get_qg_for_stage("deploy-staging") == "check_staging_status"
def test_registered_in_qg_checks(self):
from src.qg.checks import QG_CHECKS, check_staging_status
assert QG_CHECKS.get("check_staging_status") is check_staging_status
def test_deploy_stage_qg_still_check_deploy_status(self):
"""Regression: existing deploy QG must not be broken."""
assert get_qg_for_stage("deploy") == "check_deploy_status"
def test_stage_chain(self):
"""Full chain: testing->deploy-staging->deploy->done."""
from src.stages import get_next_stage
assert get_next_stage("testing") == "deploy-staging"
assert get_next_stage("deploy-staging") == "deploy"
assert get_next_stage("deploy") == "done"

View File

@@ -136,7 +136,7 @@ class TestHappyPathAgentSelection:
("architecture", "development", "developer"),
("development", "review", "reviewer"),
("review", "testing", "tester"),
("testing", "deploy", "deployer"),
("testing", "deploy-staging", "deployer"),
],
)
def test_advance_launches_current_stage_agent(
@@ -507,6 +507,120 @@ class TestAnalysisApprovedFlow:
flow.assert_called_once()
# ---------------------------------------------------------------------------
# ORCH-35: deploy-staging gate — rollback on staging failure
# ---------------------------------------------------------------------------
class TestStagingGate:
"""deploy-staging -> deploy must be gated on check_staging_status.
FAILED verdict rolls back to development (same as deploy БАГ-8 pattern:
staging failure = code is bad, needs developer fix)."""
def test_staging_success_advances_to_deploy(self, monkeypatch):
"""Happy path: staging SUCCESS -> advance to deploy (no agent launched)."""
monkeypatch.setattr(
stage_engine, "QG_CHECKS",
{**stage_engine.QG_CHECKS, "check_staging_status": _pass},
)
task_id = _make_task("deploy-staging")
res = advance_stage(
task_id, "deploy-staging", "enduro-trails", "ET-035",
"feature/ET-035-x", finished_agent="deployer",
)
assert res.advanced is True
assert res.to_stage == "deploy"
assert _stage(task_id) == "deploy"
# deploy-staging has agent=deployer, so deployer is enqueued for deploy stage
assert res.enqueued_agent == "deployer"
jobs = _jobs()
assert len(jobs) == 1
assert jobs[0]["agent"] == "deployer"
def test_staging_failed_rolls_back_to_development(self, monkeypatch):
"""ORCH-35: staging FAILED -> roll back to development, not to testing."""
monkeypatch.setattr(
stage_engine, "QG_CHECKS",
{**stage_engine.QG_CHECKS,
"check_staging_status": _fail("Staging status: FAILED")},
)
task_id = _make_task("deploy-staging")
res = advance_stage(
task_id, "deploy-staging", "enduro-trails", "ET-035",
"feature/ET-035-x", finished_agent="deployer",
)
assert res.advanced is False
assert res.rolled_back_to == "development"
assert _stage(task_id) == "development" # NOT deploy, NOT testing
assert res.alerted is True
assert stage_engine.set_issue_blocked.called
assert stage_engine.send_telegram.called
def test_staging_failed_does_not_reach_deploy(self, monkeypatch):
"""Prod deploy is unreachable if staging gate is not green."""
monkeypatch.setattr(
stage_engine, "QG_CHECKS",
{**stage_engine.QG_CHECKS,
"check_staging_status": _fail("Staging log not found")},
)
task_id = _make_task("deploy-staging")
res = advance_stage(
task_id, "deploy-staging", "enduro-trails", "ET-035",
"feature/ET-035-x", finished_agent="deployer",
)
assert res.advanced is False
# Task must NOT be in deploy stage
assert _stage(task_id) != "deploy"
def test_staging_missing_log_rolls_back(self, monkeypatch):
"""Missing 15-staging-log.md -> gate fails -> rollback to development."""
monkeypatch.setattr(
stage_engine, "QG_CHECKS",
{**stage_engine.QG_CHECKS,
"check_staging_status": _fail("Staging log not found (15-staging-log.md)")},
)
task_id = _make_task("deploy-staging")
res = advance_stage(
task_id, "deploy-staging", "enduro-trails", "ET-035",
"feature/ET-035-x", finished_agent="deployer",
)
assert res.advanced is False
assert _stage(task_id) == "development"
def test_testing_to_deploy_staging_advance(self, monkeypatch):
"""testing -> deploy-staging: deployer is enqueued (ORCH-35 chain check)."""
monkeypatch.setattr(
stage_engine, "QG_CHECKS",
{**stage_engine.QG_CHECKS, "check_tests_passed": _pass},
)
task_id = _make_task("testing")
res = advance_stage(
task_id, "testing", "enduro-trails", "ET-035",
"feature/ET-035-x", finished_agent="tester",
)
assert res.advanced is True
assert res.to_stage == "deploy-staging"
assert _stage(task_id) == "deploy-staging"
assert res.enqueued_agent == "deployer"
def test_deploy_still_rolls_back_on_check_deploy_status_fail(self, monkeypatch):
"""Existing БАГ-8 rollback must still work for deploy stage (regression guard)."""
monkeypatch.setattr(
stage_engine, "QG_CHECKS",
{**stage_engine.QG_CHECKS,
"check_deploy_status": _fail("Deploy status: FAILED")},
)
task_id = _make_task("deploy")
res = advance_stage(
task_id, "deploy", "enduro-trails", "ET-011",
"feature/ET-011-x", finished_agent="deployer",
)
assert res.advanced is False
assert res.rolled_back_to == "development"
assert _stage(task_id) == "development"
assert res.alerted is True
# ---------------------------------------------------------------------------
# launcher + plane both delegate to the engine
# ---------------------------------------------------------------------------