Compare commits

...

4 Commits

7 changed files with 296 additions and 3 deletions

View File

@@ -281,6 +281,44 @@ def check_tests_local(repo: str, branch: str) -> tuple[bool, str]:
return False, f"Local test run error: {e}"
def check_deploy_status(repo: str, work_item_id: str, branch: str | None = None) -> tuple[bool, str]:
"""
БАГ 8 fix: gate the deploy -> done transition on the deployer's machine-readable
verdict in 14-deploy-log.md frontmatter, NOT on the LLM process exit code
(which is always 0 on a successful agent session even when the deploy failed).
Mirrors check_reviewer_verdict (S-5): reads ONLY `deploy_status:` from YAML
frontmatter. Returns:
(True, ...) -> deploy_status: SUCCESS
(False, ...) -> deploy_status: FAILED, missing field, or no frontmatter
"""
import yaml
repo_path = _repo_path(repo, branch)
log_path = os.path.join(repo_path, f"docs/work-items/{work_item_id}/14-deploy-log.md")
if not os.path.isfile(log_path):
return False, "Deploy log not found (14-deploy-log.md)"
try:
with open(log_path, "r") as f:
content = f.read()
status = None
if content.startswith("---"):
parts = content.split("---", 2)
if len(parts) >= 3:
try:
fm = yaml.safe_load(parts[1]) or {}
except yaml.YAMLError as e:
return False, f"Invalid YAML frontmatter in deploy log: {e}"
status = str(fm.get("deploy_status", "")).upper().strip()
if status == "SUCCESS":
return True, "Deploy status: SUCCESS"
if status == "FAILED":
return False, "Deploy status: FAILED"
return False, f"No machine-readable deploy_status in frontmatter (got: {status!r})"
except OSError as e:
return False, f"Error reading deploy log: {e}"
# Registry for dynamic lookup by name
QG_CHECKS = {
"check_analysis_approved": check_analysis_approved,
@@ -291,4 +329,5 @@ QG_CHECKS = {
"check_tests_passed": check_tests_passed,
"check_reviewer_verdict": check_reviewer_verdict,
"check_tests_local": check_tests_local,
"check_deploy_status": check_deploy_status,
}

View File

@@ -490,3 +490,31 @@ def _handle_qg_failure_rollbacks(
f"Task {task_id}: architect conflict, enqueued analyst "
f"(job_id={new_job})"
)
# БАГ 8: deployer verdict FAILED -> roll deploy back to development.
# The launcher's exit_code-based guard (launcher.py:475) never fires because
# the LLM process exit code is always 0; this gate fires on the machine-readable
# deploy_status verdict in 14-deploy-log.md instead. Mirrors the launcher block
# (rollback + set_issue_blocked + notify) but is driven by the VERDICT.
if agent == "deployer" and qg_name == "check_deploy_status":
update_task_stage(task_id, "development")
notify_stage_change(task_id, current_stage, "development")
plane_notify_stage(work_item_id, current_stage, "development")
result.rolled_back_to = "development"
set_issue_blocked(work_item_id)
notify_qg_failure(task_id, "deploy", "check_deploy_status", reason)
plane_add_comment(
work_item_id,
f"\u274c Deploy FAILED ({reason}). Rolled back to development. "
f"Developer \u043d\u0443\u0436\u0435\u043d \u0434\u043b\u044f \u0444\u0438\u043a\u0441\u0430.",
author="deployer",
)
send_telegram(
f"\U0001f6a8 {work_item_id}: Deploy FAILED ({reason}). "
f"Rolled back to development. Needs fix."
)
result.alerted = True
logger.error(
f"Task {task_id}: deployer verdict FAILED, rolled back deploy -> "
f"development ({reason})"
)

View File

@@ -16,7 +16,7 @@ STAGE_TRANSITIONS = {
"development": {"next": "review", "agent": "reviewer", "qg": "check_ci_green"},
"review": {"next": "testing", "agent": "tester", "qg": "check_reviewer_verdict"},
"testing": {"next": "deploy", "agent": "deployer", "qg": "check_tests_passed"},
"deploy": {"next": "done", "agent": None, "qg": None},
"deploy": {"next": "done", "agent": None, "qg": "check_deploy_status"},
"done": {"next": None, "agent": None, "qg": None},
}

View File

@@ -217,9 +217,30 @@ async def handle_ci_status(payload: dict):
notify_qg_failure(task_id, current_stage, "check_ci_green", reason)
elif state == "failure" and current_stage == "development":
# CI is now the authoritative gate for development -> review.
# A failing CI means the QG did not pass; notify (do not silently advance).
# CI is the authoritative gate for development -> review.
# On red CI: notify, then bounce the task back to the developer (capped retries),
# symmetric to the review REQUEST_CHANGES path.
notify_qg_failure(task_id, current_stage, "check_ci_green", f"Gitea CI failed on branch '{branch}'")
conn = get_db()
retry_count = conn.execute(
"SELECT COUNT(*) as cnt FROM agent_runs WHERE task_id = ? AND agent = 'developer'",
(task_id,),
).fetchone()["cnt"]
conn.close()
if retry_count < MAX_DEV_RETRIES:
# task already on 'development' — no stage change needed, just relaunch developer
try:
task_desc = (
f"Work item: {work_item_id}\nRepo: {repo_name}\nBranch: {branch}\n"
f"Stage: development\nNote: CI failed, fix and re-push (attempt {retry_count + 1}/{MAX_DEV_RETRIES})"
)
job_id = enqueue_job("developer", repo_name, task_desc, task_id=task_id)
logger.info(f"Task {task_id}: CI failed, enqueued developer (attempt {retry_count + 1}, job_id={job_id})")
except Exception as e:
notify_error(task_id, f"Failed to relaunch developer after CI failure: {e}")
else:
notify_error(task_id, f"Max developer retries ({MAX_DEV_RETRIES}) reached after CI failure, escalating")
logger.error(f"Task {task_id}: max retries reached after CI failure, needs manual intervention")
async def handle_pr(payload: dict):

View File

@@ -18,6 +18,7 @@ from src.qg.checks import (
check_review_approved,
check_tests_passed,
check_tests_local,
check_deploy_status,
)
from src.stages import get_qg_for_stage
@@ -190,6 +191,65 @@ class TestCheckTestsPassed:
assert "not found" in reason.lower()
class TestCheckDeployStatus:
"""BUG 8: deploy -> done must be gated on the deployer's machine-readable
deploy_status verdict in 14-deploy-log.md frontmatter, NOT the LLM exit code
(always 0). Mirrors check_reviewer_verdict (reads ONLY the frontmatter field)."""
def _write_log(self, repo_dir, content):
wi_dir = repo_dir / "docs" / "work-items" / "ET-011"
wi_dir.mkdir(parents=True)
(wi_dir / "14-deploy-log.md").write_text(content)
def test_success_verdict_passes(self, setup_work_item_dir):
self._write_log(
setup_work_item_dir,
"---\ndeploy_status: SUCCESS\nversion: v0.0.3\n---\n\nDeployed OK.\n",
)
passed, reason = check_deploy_status("enduro-trails", "ET-011")
assert passed is True
assert "SUCCESS" in reason
def test_failed_verdict_fails(self, setup_work_item_dir):
self._write_log(
setup_work_item_dir,
"---\ndeploy_status: FAILED\nversion: v0.0.3\n---\n\npermission denied.\n",
)
passed, reason = check_deploy_status("enduro-trails", "ET-011")
assert passed is False
assert "FAILED" in reason
def test_no_file_fails(self, setup_work_item_dir):
passed, reason = check_deploy_status("enduro-trails", "ET-011")
assert passed is False
assert "not found" in reason.lower()
def test_no_field_fails(self, setup_work_item_dir):
# Frontmatter present but no deploy_status field -> must NOT pass.
self._write_log(
setup_work_item_dir,
"---\nversion: v0.0.3\n---\n\nStatus: FAILED (prose only).\n",
)
passed, reason = check_deploy_status("enduro-trails", "ET-011")
assert passed is False
def test_prose_only_no_frontmatter_fails(self, setup_work_item_dir):
# Prose mentioning SUCCESS but no machine-readable frontmatter -> fail.
self._write_log(
setup_work_item_dir,
"# Deploy log\n\nStatus: SUCCESS (prose, not frontmatter).\n",
)
passed, reason = check_deploy_status("enduro-trails", "ET-011")
assert passed is False
def test_deploy_stage_qg_is_check_deploy_status(self):
assert get_qg_for_stage("deploy") == "check_deploy_status"
def test_registered_in_qg_checks(self):
from src.qg.checks import QG_CHECKS
assert QG_CHECKS.get("check_deploy_status") is check_deploy_status
class TestDevelopmentStageQG:
"""BUG 6: development stage QG is now check_ci_green (CI is the authoritative
gate), not the deprecated check_tests_local."""

View File

@@ -300,6 +300,59 @@ class TestTesterFail:
assert _jobs() == []
# ---------------------------------------------------------------------------
# BUG 8: deploy verdict gates deploy -> done (not the LLM exit code)
# ---------------------------------------------------------------------------
class TestDeployVerdict:
"""deploy -> done must be gated on check_deploy_status (the deployer's
machine-readable verdict), NOT on the LLM exit code (always 0)."""
def test_failed_verdict_rolls_back_to_development(self, monkeypatch):
# deployer finished (exit_code 0 from launcher), but verdict is FAILED.
monkeypatch.setattr(
stage_engine, "QG_CHECKS",
{**stage_engine.QG_CHECKS,
"check_deploy_status": _fail("Deploy status: FAILED")},
)
task_id = _make_task("deploy")
res = advance_stage(task_id, "deploy", "enduro-trails", "ET-011",
"feature/ET-011-x", finished_agent="deployer")
assert res.advanced is False
assert res.rolled_back_to == "development"
assert _stage(task_id) == "development" # NOT done
assert res.alerted is True
assert stage_engine.set_issue_blocked.called
assert stage_engine.send_telegram.called
def test_no_deploy_log_rolls_back(self, monkeypatch):
# No frontmatter field / no file -> check returns False -> rollback.
monkeypatch.setattr(
stage_engine, "QG_CHECKS",
{**stage_engine.QG_CHECKS,
"check_deploy_status": _fail("Deploy log not found (14-deploy-log.md)")},
)
task_id = _make_task("deploy")
res = advance_stage(task_id, "deploy", "enduro-trails", "ET-011",
"feature/ET-011-x", finished_agent="deployer")
assert res.advanced is False
assert _stage(task_id) == "development"
def test_success_verdict_advances_to_done(self, monkeypatch):
monkeypatch.setattr(
stage_engine, "QG_CHECKS",
{**stage_engine.QG_CHECKS,
"check_deploy_status": _pass},
)
task_id = _make_task("deploy")
res = advance_stage(task_id, "deploy", "enduro-trails", "ET-011",
"feature/ET-011-x", finished_agent="deployer")
assert res.advanced is True
assert res.to_stage == "done"
assert _stage(task_id) == "done"
assert res.enqueued_agent is None # no agent leaves deploy
assert _jobs() == []
# ---------------------------------------------------------------------------
# Architect conflict -> rollback to analysis + enqueue analyst
# ---------------------------------------------------------------------------

View File

@@ -1,4 +1,5 @@
import pytest
import asyncio
import os
import tempfile
from unittest.mock import patch, MagicMock, AsyncMock
@@ -341,3 +342,94 @@ def test_plane_webhook_event_logged():
conn.close()
assert event is not None
assert event["source"] == "plane"
# ---------------------------------------------------------------------------
# BUG 7: red CI on development must bounce the task back to the developer
# (capped retries, symmetric to review REQUEST_CHANGES). These are pure-logic
# tests: they invoke handle_ci_status() directly with mocked helpers so they do
# not pass through the TestClient HMAC barrier (baseline 401s are off-limits).
# ---------------------------------------------------------------------------
def _ci_failure_payload():
return {
"state": "failure",
"branches": [{"name": "feature/ET-011-test"}],
"repository": {"name": "enduro-trails"},
}
def _mock_db_with_retry_count(count):
"""Build a get_db() mock whose retry_count query returns `count`."""
conn = MagicMock()
conn.execute.return_value.fetchone.return_value = {"cnt": count}
return conn
@patch("src.webhooks.gitea.notify_error")
@patch("src.webhooks.gitea.notify_qg_failure")
@patch("src.webhooks.gitea.enqueue_job")
@patch("src.webhooks.gitea.update_task_stage")
@patch("src.webhooks.gitea.get_db")
@patch("src.webhooks.gitea.get_task_by_repo_branch")
@patch("src.webhooks.gitea.get_project_by_repo")
def test_ci_failure_development_retries_developer_under_limit(
mock_proj, mock_task, mock_get_db, mock_update_stage,
mock_enqueue, mock_qg, mock_err,
):
"""retry_count < MAX_DEV_RETRIES → relaunch developer, stage untouched."""
from src.webhooks.gitea import handle_ci_status
mock_proj.return_value = {"repo": "enduro-trails"}
mock_task.return_value = {
"id": 1, "stage": "development", "work_item_id": "ET-011",
}
mock_get_db.return_value = _mock_db_with_retry_count(0)
mock_enqueue.return_value = 42
asyncio.run(handle_ci_status(_ci_failure_payload()))
# QG failure was still reported (Slava sees both the failure and the retry).
assert mock_qg.called
# developer was re-enqueued.
assert mock_enqueue.called
assert mock_enqueue.call_args[0][0] == "developer"
# No escalation.
assert not mock_err.called
# Stage stays on development — no update_task_stage in the CI-failure path.
assert not mock_update_stage.called
@patch("src.webhooks.gitea.notify_error")
@patch("src.webhooks.gitea.notify_qg_failure")
@patch("src.webhooks.gitea.enqueue_job")
@patch("src.webhooks.gitea.update_task_stage")
@patch("src.webhooks.gitea.get_db")
@patch("src.webhooks.gitea.get_task_by_repo_branch")
@patch("src.webhooks.gitea.get_project_by_repo")
def test_ci_failure_development_escalates_at_limit(
mock_proj, mock_task, mock_get_db, mock_update_stage,
mock_enqueue, mock_qg, mock_err,
):
"""retry_count >= MAX_DEV_RETRIES → escalate via notify_error, no relaunch."""
from src.webhooks.gitea import handle_ci_status, MAX_DEV_RETRIES
mock_proj.return_value = {"repo": "enduro-trails"}
mock_task.return_value = {
"id": 1, "stage": "development", "work_item_id": "ET-011",
}
mock_get_db.return_value = _mock_db_with_retry_count(MAX_DEV_RETRIES)
asyncio.run(handle_ci_status(_ci_failure_payload()))
# QG failure still reported.
assert mock_qg.called
# developer NOT re-enqueued at the cap.
assert not mock_enqueue.called
# Escalation message mentions CI failure.
assert mock_err.called
err_msg = " ".join(str(a) for a in mock_err.call_args[0])
assert "Max developer retries" in err_msg
assert "after CI failure" in err_msg
# Stage untouched.
assert not mock_update_stage.called