Compare commits

...

10 Commits

Author SHA1 Message Date
Dev Agent
3a285de11d fix(ci): bounce task back to developer on red CI (capped retries) 2026-06-04 01:39:40 +03:00
7922f6b67b Merge pull request 'fix(qg): use check_ci_green instead of local tests on development stage' (#17) from fix/drop-local-tests-qg into main 2026-06-04 01:24:14 +03:00
Dev Agent
e15d339b14 fix(qg): use check_ci_green instead of local tests on development stage 2026-06-04 01:22:43 +03:00
994f73a78e Merge pull request 'fix(qg): run pytest directly instead of make in check_tests_local' (#16) from fix/qg-pytest-no-make into main 2026-06-04 00:44:40 +03:00
orchestrator-dev
90c9ffe839 fix(qg): run pytest directly instead of make in check_tests_local 2026-06-04 00:43:04 +03:00
b6aa107f93 Merge pull request 'fix(stage): approved verdict advances analysis->architecture instead of re-running gate' (#15) from fix/approved-advances-stage into main 2026-06-03 23:31:45 +03:00
Dev Agent
0b8013cb06 fix(stage): approved verdict advances analysis->architecture instead of re-running gate 2026-06-03 23:30:08 +03:00
b01643fcc3 Merge pull request 'feat(config): external gitea_public_url for clickable doc links' (#14) from fix/gitea-public-url into main 2026-06-03 22:59:17 +03:00
Dev Agent
ca63bc26bb feat(config): external gitea_public_url for clickable doc links 2026-06-03 22:58:18 +03:00
dce9ac806b Merge pull request 'fix(pipeline): description+name to analyst, status-only analyst comment with doc links' (#13) from fix/taskmd-description into main 2026-06-03 22:45:17 +03:00
9 changed files with 355 additions and 39 deletions

View File

@@ -22,6 +22,7 @@ class Settings(BaseSettings):
# Gitea # Gitea
gitea_url: str = "http://localhost:3000" gitea_url: str = "http://localhost:3000"
gitea_public_url: str = "" # external URL for clickable links in comments; falls back to gitea_url
gitea_token: str = "" gitea_token: str = ""
gitea_webhook_secret: str = "" gitea_webhook_secret: str = ""
gitea_owner: str = "admin" gitea_owner: str = "admin"

View File

@@ -249,9 +249,17 @@ def check_reviewer_verdict(repo: str, work_item_id: str, branch: str | None = No
def check_tests_local(repo: str, branch: str) -> tuple[bool, str]: def check_tests_local(repo: str, branch: str) -> tuple[bool, str]:
""" """
DEPRECATED: replaced by check_ci_green on the development stage (CI is now
configured). Kept for backward-compat; not wired to any stage.
S-1 fix: run the project test suite locally and judge by exit code, instead of S-1 fix: run the project test suite locally and judge by exit code, instead of
depending on Gitea CI (which is not configured -> always false). depending on Gitea CI (which is not configured -> always false).
БАГ 5 fix: invoke pytest directly instead of make test. make is not installed
in the orchestrator container, so the previous ["make", "test"] call raised
FileNotFoundError. This reproduces the Makefile test target 1:1
(cd src/api && python -m pytest ../../tests/ -v).
ORCH-2 / S-4: tests run inside the per-branch worktree (ensure_worktree), so this ORCH-2 / S-4: tests run inside the per-branch worktree (ensure_worktree), so this
is safe for concurrent active tasks — no shared /repos checkout race. is safe for concurrent active tasks — no shared /repos checkout race.
""" """
@@ -259,7 +267,8 @@ def check_tests_local(repo: str, branch: str) -> tuple[bool, str]:
try: try:
repo_path = ensure_worktree(repo, branch) repo_path = ensure_worktree(repo, branch)
r = subprocess.run( r = subprocess.run(
["make", "test"], cwd=repo_path, ["python", "-m", "pytest", "../../tests/", "-v"],
cwd=os.path.join(repo_path, "src", "api"),
capture_output=True, text=True, timeout=600, capture_output=True, text=True, timeout=600,
) )
if r.returncode == 0: if r.returncode == 0:

View File

@@ -189,36 +189,48 @@ def advance_stage(
# --- Quality gate ---------------------------------------------------- # --- Quality gate ----------------------------------------------------
if qg_name and qg_name in QG_CHECKS: if qg_name and qg_name in QG_CHECKS:
# Human-approval gate: special analyst approved-flow (launcher only). # Human-approval gate: split by path.
if qg_name == "check_analysis_approved": if qg_name == "check_analysis_approved":
_handle_analysis_approved_flow( # Launcher path (analyst just finished): set In Review + ask for
task_id, current_stage, repo, work_item_id, branch, agent, result # the Approved status. This gate never advances on its own -- a
) # human Approved verdict does that.
return result if agent == "analyst":
_handle_analysis_approved_flow(
task_id, current_stage, repo, work_item_id, branch, agent, result
)
return result
# Webhook Approved-verdict path (agent is None): the human flipped
# the Plane status to Approved, which IS the approval. The gate is
# satisfied -- do NOT re-run check_analysis_approved (it looks for
# an :approved: *comment* and would block on a status-only
# approval). Mark it passed and fall through to the Advance block.
result.qg_name = qg_name
result.qg_passed = True
result.qg_reason = "approved-via-status"
else:
passed, reason = _run_qg(qg_name, repo, work_item_id, branch)
result.qg_passed = passed
result.qg_reason = reason
passed, reason = _run_qg(qg_name, repo, work_item_id, branch) if not passed:
result.qg_passed = passed logger.info(
result.qg_reason = reason f"Task {task_id}: QG '{qg_name}' not passed after {agent}: {reason}"
)
# Behaviour parity:
# - webhook path (finished_agent is None): emit the generic
# QG-failure notification, exactly like the old plane handler.
# - launcher path (finished_agent set): NO generic notification;
# the rollback branches below own their own messaging, exactly
# like the old launcher handler.
if agent is None:
notify_qg_failure(task_id, current_stage, qg_name, reason)
plane_notify_qg(work_item_id, current_stage, qg_name, reason)
if not passed: _handle_qg_failure_rollbacks(
logger.info( task_id, current_stage, repo, work_item_id, branch,
f"Task {task_id}: QG '{qg_name}' not passed after {agent}: {reason}" agent, qg_name, reason, result,
) )
# Behaviour parity: return result
# - webhook path (finished_agent is None): emit the generic
# QG-failure notification, exactly like the old plane handler.
# - launcher path (finished_agent set): NO generic notification;
# the rollback branches below own their own messaging, exactly
# like the old launcher handler.
if agent is None:
notify_qg_failure(task_id, current_stage, qg_name, reason)
plane_notify_qg(work_item_id, current_stage, qg_name, reason)
_handle_qg_failure_rollbacks(
task_id, current_stage, repo, work_item_id, branch,
agent, qg_name, reason, result,
)
return result
elif qg_name: elif qg_name:
# QG name set but not registered — do not advance (launcher behavior). # QG name set but not registered — do not advance (launcher behavior).
@@ -296,7 +308,7 @@ def _build_analyst_ready_comment(repo: str, work_item_id: str, branch: str) -> s
wt_dir = None wt_dir = None
owner = getattr(settings, "gitea_owner", "admin") owner = getattr(settings, "gitea_owner", "admin")
base = settings.gitea_url.rstrip("/") base = (getattr(settings, "gitea_public_url", "") or settings.gitea_url).rstrip("/")
links = [] links = []
for label, fname in candidates: for label, fname in candidates:
if wt_dir and not os.path.isfile(os.path.join(wt_dir, fname)): if wt_dir and not os.path.isfile(os.path.join(wt_dir, fname)):

View File

@@ -13,7 +13,7 @@ STAGE_TRANSITIONS = {
"created": {"next": "analysis", "agent": "analyst", "qg": None}, "created": {"next": "analysis", "agent": "analyst", "qg": None},
"analysis": {"next": "architecture", "agent": "architect", "qg": "check_analysis_approved"}, "analysis": {"next": "architecture", "agent": "architect", "qg": "check_analysis_approved"},
"architecture": {"next": "development", "agent": "developer", "qg": "check_architecture_done"}, "architecture": {"next": "development", "agent": "developer", "qg": "check_architecture_done"},
"development": {"next": "review", "agent": "reviewer", "qg": "check_tests_local"}, "development": {"next": "review", "agent": "reviewer", "qg": "check_ci_green"},
"review": {"next": "testing", "agent": "tester", "qg": "check_reviewer_verdict"}, "review": {"next": "testing", "agent": "tester", "qg": "check_reviewer_verdict"},
"testing": {"next": "deploy", "agent": "deployer", "qg": "check_tests_passed"}, "testing": {"next": "deploy", "agent": "deployer", "qg": "check_tests_passed"},
"deploy": {"next": "done", "agent": None, "qg": None}, "deploy": {"next": "done", "agent": None, "qg": None},

View File

@@ -216,12 +216,31 @@ async def handle_ci_status(payload: dict):
else: else:
notify_qg_failure(task_id, current_stage, "check_ci_green", reason) notify_qg_failure(task_id, current_stage, "check_ci_green", reason)
elif state == "failure": elif state == "failure" and current_stage == "development":
# S-1: Gitea CI is NOT the authoritative gate anymore (the orchestrator runs # CI is the authoritative gate for development -> review.
# tests locally via check_tests_local). Gitea CI is often unconfigured, so a # On red CI: notify, then bounce the task back to the developer (capped retries),
# "failure"/empty status here is not actionable. Log only, do not alert. # symmetric to the review REQUEST_CHANGES path.
logger.debug(f"Task {task_id}: Gitea CI state='failure' on branch '{branch}' " notify_qg_failure(task_id, current_stage, "check_ci_green", f"Gitea CI failed on branch '{branch}'")
f"(non-authoritative, suppressed — local tests are the gate)") conn = get_db()
retry_count = conn.execute(
"SELECT COUNT(*) as cnt FROM agent_runs WHERE task_id = ? AND agent = 'developer'",
(task_id,),
).fetchone()["cnt"]
conn.close()
if retry_count < MAX_DEV_RETRIES:
# task already on 'development' — no stage change needed, just relaunch developer
try:
task_desc = (
f"Work item: {work_item_id}\nRepo: {repo_name}\nBranch: {branch}\n"
f"Stage: development\nNote: CI failed, fix and re-push (attempt {retry_count + 1}/{MAX_DEV_RETRIES})"
)
job_id = enqueue_job("developer", repo_name, task_desc, task_id=task_id)
logger.info(f"Task {task_id}: CI failed, enqueued developer (attempt {retry_count + 1}, job_id={job_id})")
except Exception as e:
notify_error(task_id, f"Failed to relaunch developer after CI failure: {e}")
else:
notify_error(task_id, f"Max developer retries ({MAX_DEV_RETRIES}) reached after CI failure, escalating")
logger.error(f"Task {task_id}: max retries reached after CI failure, needs manual intervention")
async def handle_pr(payload: dict): async def handle_pr(payload: dict):

View File

@@ -25,7 +25,9 @@ def test_analyst_comment_asks_approved_with_links(monkeypatch, tmp_path):
# 04b-ui-test-cases.md intentionally absent -> must NOT be linked # 04b-ui-test-cases.md intentionally absent -> must NOT be linked
monkeypatch.setattr(SE, "get_worktree_path", lambda repo, branch: str(wt)) monkeypatch.setattr(SE, "get_worktree_path", lambda repo, branch: str(wt))
monkeypatch.setattr(SE.settings, "gitea_url", "https://git.mva154.duckdns.org") # public URL set -> links must be built from it (not gitea_url)
monkeypatch.setattr(SE.settings, "gitea_url", "http://localhost:3000")
monkeypatch.setattr(SE.settings, "gitea_public_url", "https://git.mva154.duckdns.org")
monkeypatch.setattr(SE.settings, "gitea_owner", "admin") monkeypatch.setattr(SE.settings, "gitea_owner", "admin")
html = SE._build_analyst_ready_comment( html = SE._build_analyst_ready_comment(
@@ -45,3 +47,28 @@ def test_analyst_comment_asks_approved_with_links(monkeypatch, tmp_path):
assert base + "04-test-plan.yaml" in html assert base + "04-test-plan.yaml" in html
# the missing file is NOT invented # the missing file is NOT invented
assert "04b-ui-test-cases.md" not in html assert "04b-ui-test-cases.md" not in html
# internal git url must NOT appear in clickable links
assert "localhost:3000" not in html
def test_analyst_comment_falls_back_to_gitea_url(monkeypatch, tmp_path):
"""When gitea_public_url is empty, links fall back to gitea_url."""
from src import stage_engine as SE
wt = tmp_path / "wt"
docs = wt / "docs" / "work-items" / "ET-011"
docs.mkdir(parents=True)
(docs / "01-brd.md").write_text("x")
monkeypatch.setattr(SE, "get_worktree_path", lambda repo, branch: str(wt))
monkeypatch.setattr(SE.settings, "gitea_url", "http://localhost:3000")
monkeypatch.setattr(SE.settings, "gitea_public_url", "")
monkeypatch.setattr(SE.settings, "gitea_owner", "admin")
html = SE._build_analyst_ready_comment(
"enduro-trails", "ET-011", "feature/ET-011-gpx-upload-feature"
)
base = ("http://localhost:3000/admin/enduro-trails/src/branch/"
"feature/ET-011-gpx-upload-feature/docs/work-items/ET-011/")
assert base + "01-brd.md" in html

View File

@@ -17,7 +17,9 @@ from src.qg.checks import (
check_ci_green, check_ci_green,
check_review_approved, check_review_approved,
check_tests_passed, check_tests_passed,
check_tests_local,
) )
from src.stages import get_qg_for_stage
@pytest.fixture(autouse=True) @pytest.fixture(autouse=True)
@@ -186,3 +188,57 @@ class TestCheckTestsPassed:
passed, reason = check_tests_passed("enduro-trails", "ET-001") passed, reason = check_tests_passed("enduro-trails", "ET-001")
assert passed is False assert passed is False
assert "not found" in reason.lower() assert "not found" in reason.lower()
class TestDevelopmentStageQG:
"""BUG 6: development stage QG is now check_ci_green (CI is the authoritative
gate), not the deprecated check_tests_local."""
def test_development_qg_is_check_ci_green(self):
assert get_qg_for_stage("development") == "check_ci_green"
def test_check_tests_local_is_deprecated_and_unwired(self):
# Kept in the registry for backward-compat, but not wired to any stage.
from src.qg.checks import QG_CHECKS
from src.stages import STAGE_TRANSITIONS
assert "check_tests_local" in QG_CHECKS
wired = {t.get("qg") for t in STAGE_TRANSITIONS.values()}
assert "check_tests_local" not in wired
class TestCheckTestsLocal:
"""BUG 5: check_tests_local must run pytest directly (not make, which is
not installed in the orchestrator container)."""
@patch("src.qg.checks.ensure_worktree")
@patch("subprocess.run")
def test_passes_on_returncode_zero(self, mock_run, mock_wt, tmp_path):
mock_wt.return_value = str(tmp_path)
mock_run.return_value = MagicMock(returncode=0, stdout="ok", stderr="")
passed, reason = check_tests_local("enduro-trails", "feature/ET-001-x")
assert passed is True
assert reason == "Local tests passed"
@patch("src.qg.checks.ensure_worktree")
@patch("subprocess.run")
def test_fails_on_nonzero_returncode(self, mock_run, mock_wt, tmp_path):
mock_wt.return_value = str(tmp_path)
mock_run.return_value = MagicMock(returncode=1, stdout="boom", stderr="trace")
passed, reason = check_tests_local("enduro-trails", "feature/ET-001-x")
assert passed is False
assert "Local tests failed" in reason
@patch("src.qg.checks.ensure_worktree")
@patch("subprocess.run")
def test_invokes_pytest_not_make(self, mock_run, mock_wt, tmp_path):
"""The subprocess call must be pytest, from src/api, against ../../tests/."""
mock_wt.return_value = str(tmp_path)
mock_run.return_value = MagicMock(returncode=0, stdout="", stderr="")
check_tests_local("enduro-trails", "feature/ET-001-x")
args, kwargs = mock_run.call_args
cmd = args[0]
assert "make" not in cmd
assert cmd[:3] == ["python", "-m", "pytest"]
assert "../../tests/" in cmd
assert kwargs["cwd"] == os.path.join(str(tmp_path), "src", "api")

View File

@@ -203,10 +203,13 @@ class TestQgFailureDoesNotAdvance:
assert _jobs() == [] assert _jobs() == []
def test_webhook_path_emits_qg_failure_notification(self, monkeypatch): def test_webhook_path_emits_qg_failure_notification(self, monkeypatch):
"""finished_agent=None -> generic QG-failure notification fires (plane parity).""" """finished_agent=None -> generic QG-failure notification fires (plane parity).
development stage QG is now check_ci_green (was check_tests_local).
"""
monkeypatch.setattr( monkeypatch.setattr(
stage_engine, "QG_CHECKS", stage_engine, "QG_CHECKS",
{**stage_engine.QG_CHECKS, "check_tests_local": _fail("ci red")}, {**stage_engine.QG_CHECKS, "check_ci_green": _fail("ci red")},
) )
task_id = _make_task("development") task_id = _make_task("development")
advance_stage(task_id, "development", "enduro-trails", "ET-001", advance_stage(task_id, "development", "enduro-trails", "ET-001",
@@ -358,6 +361,63 @@ class TestAnalysisApprovedFlow:
assert stage_engine.notify_approve_requested.called assert stage_engine.notify_approve_requested.called
assert _jobs() == [] assert _jobs() == []
def test_approved_verdict_advances_analysis_to_architecture(self, monkeypatch):
"""BUG 4: a human Approved STATUS (webhook path, finished_agent=None)
must satisfy the analysis gate and advance analysis -> architecture,
enqueuing the architect. The status-only approval must NOT re-run
check_analysis_approved (which looks for an :approved: COMMENT and would
otherwise wrongly block the advance).
"""
# Make check_analysis_approved FAIL if it is ever called: the webhook
# path must bypass it entirely (status == approval). If the engine were
# to re-run the gate, this would block the advance and fail the test.
monkeypatch.setattr(
stage_engine, "QG_CHECKS",
{
**stage_engine.QG_CHECKS,
"check_analysis_approved": _fail("no :approved: comment"),
},
)
# Guard: the approval-flow (launcher-only) must NOT be invoked here.
flow = MagicMock()
monkeypatch.setattr(stage_engine, "_handle_analysis_approved_flow", flow)
task_id = _make_task("analysis")
res = advance_stage(
task_id, "analysis", "enduro-trails", "ET-001",
"feature/ET-001-x", finished_agent=None,
)
assert res.advanced is True
assert res.to_stage == "architecture"
assert _stage(task_id) == "architecture"
assert res.enqueued_agent == "architect"
# Sanity: agent for analysis is architect, never analyst (no re-run loop).
assert get_agent_for_stage("analysis") == "architect"
jobs = _jobs()
assert len(jobs) == 1
assert jobs[0]["agent"] == "architect"
# The launcher-only approval-flow was NOT called on the webhook path.
flow.assert_not_called()
def test_launcher_path_does_not_advance_and_calls_flow(self, monkeypatch):
"""Regression: the launcher path (finished_agent='analyst') still routes
into _handle_analysis_approved_flow and does NOT advance.
"""
flow = MagicMock()
monkeypatch.setattr(stage_engine, "_handle_analysis_approved_flow", flow)
task_id = _make_task("analysis")
res = advance_stage(
task_id, "analysis", "enduro-trails", "ET-001",
"feature/ET-001-x", finished_agent="analyst",
)
assert res.advanced is not True
assert _stage(task_id) == "analysis"
assert _jobs() == []
flow.assert_called_once()
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
# launcher + plane both delegate to the engine # launcher + plane both delegate to the engine

View File

@@ -1,4 +1,5 @@
import pytest import pytest
import asyncio
import os import os
import tempfile import tempfile
from unittest.mock import patch, MagicMock, AsyncMock from unittest.mock import patch, MagicMock, AsyncMock
@@ -272,6 +273,46 @@ def test_gitea_ci_success_advances_to_review(mock_launcher, mock_ci):
assert task["stage"] == "review" assert task["stage"] == "review"
@patch("src.webhooks.gitea.notify_qg_failure")
@patch("src.webhooks.gitea.launcher")
def test_gitea_ci_failure_on_development_notifies_qg_failure(mock_launcher, mock_notify):
"""BUG 6: CI failure at development is now the authoritative QG gate failing.
It must notify QG failure (not silently suppress) and must NOT advance the stage.
"""
conn = get_db()
conn.execute(
"INSERT INTO tasks (plane_id, work_item_id, repo, branch, stage) VALUES (?, ?, ?, ?, ?)",
("ci-fail-001", "ET-011", "enduro-trails", "feature/ET-011-test", "development"),
)
conn.commit()
conn.close()
resp = client.post(
"/webhook/gitea",
json={
"state": "failure",
"branches": [{"name": "feature/ET-011-test"}],
"repository": {"name": "enduro-trails"},
},
headers={"X-Gitea-Event": "status"},
)
assert resp.status_code == 200
# QG failure was reported for the development stage with check_ci_green.
assert mock_notify.called
args, kwargs = mock_notify.call_args
call = list(args) + list(kwargs.values())
assert "development" in call
assert "check_ci_green" in call
# Stage did NOT advance.
conn = get_db()
task = conn.execute("SELECT * FROM tasks WHERE plane_id = 'ci-fail-001'").fetchone()
conn.close()
assert task["stage"] == "development"
def test_gitea_webhook_pr(): def test_gitea_webhook_pr():
"""PR event is accepted.""" """PR event is accepted."""
resp = client.post( resp = client.post(
@@ -301,3 +342,94 @@ def test_plane_webhook_event_logged():
conn.close() conn.close()
assert event is not None assert event is not None
assert event["source"] == "plane" assert event["source"] == "plane"
# ---------------------------------------------------------------------------
# BUG 7: red CI on development must bounce the task back to the developer
# (capped retries, symmetric to review REQUEST_CHANGES). These are pure-logic
# tests: they invoke handle_ci_status() directly with mocked helpers so they do
# not pass through the TestClient HMAC barrier (baseline 401s are off-limits).
# ---------------------------------------------------------------------------
def _ci_failure_payload():
return {
"state": "failure",
"branches": [{"name": "feature/ET-011-test"}],
"repository": {"name": "enduro-trails"},
}
def _mock_db_with_retry_count(count):
"""Build a get_db() mock whose retry_count query returns `count`."""
conn = MagicMock()
conn.execute.return_value.fetchone.return_value = {"cnt": count}
return conn
@patch("src.webhooks.gitea.notify_error")
@patch("src.webhooks.gitea.notify_qg_failure")
@patch("src.webhooks.gitea.enqueue_job")
@patch("src.webhooks.gitea.update_task_stage")
@patch("src.webhooks.gitea.get_db")
@patch("src.webhooks.gitea.get_task_by_repo_branch")
@patch("src.webhooks.gitea.get_project_by_repo")
def test_ci_failure_development_retries_developer_under_limit(
mock_proj, mock_task, mock_get_db, mock_update_stage,
mock_enqueue, mock_qg, mock_err,
):
"""retry_count < MAX_DEV_RETRIES → relaunch developer, stage untouched."""
from src.webhooks.gitea import handle_ci_status
mock_proj.return_value = {"repo": "enduro-trails"}
mock_task.return_value = {
"id": 1, "stage": "development", "work_item_id": "ET-011",
}
mock_get_db.return_value = _mock_db_with_retry_count(0)
mock_enqueue.return_value = 42
asyncio.run(handle_ci_status(_ci_failure_payload()))
# QG failure was still reported (Slava sees both the failure and the retry).
assert mock_qg.called
# developer was re-enqueued.
assert mock_enqueue.called
assert mock_enqueue.call_args[0][0] == "developer"
# No escalation.
assert not mock_err.called
# Stage stays on development — no update_task_stage in the CI-failure path.
assert not mock_update_stage.called
@patch("src.webhooks.gitea.notify_error")
@patch("src.webhooks.gitea.notify_qg_failure")
@patch("src.webhooks.gitea.enqueue_job")
@patch("src.webhooks.gitea.update_task_stage")
@patch("src.webhooks.gitea.get_db")
@patch("src.webhooks.gitea.get_task_by_repo_branch")
@patch("src.webhooks.gitea.get_project_by_repo")
def test_ci_failure_development_escalates_at_limit(
mock_proj, mock_task, mock_get_db, mock_update_stage,
mock_enqueue, mock_qg, mock_err,
):
"""retry_count >= MAX_DEV_RETRIES → escalate via notify_error, no relaunch."""
from src.webhooks.gitea import handle_ci_status, MAX_DEV_RETRIES
mock_proj.return_value = {"repo": "enduro-trails"}
mock_task.return_value = {
"id": 1, "stage": "development", "work_item_id": "ET-011",
}
mock_get_db.return_value = _mock_db_with_retry_count(MAX_DEV_RETRIES)
asyncio.run(handle_ci_status(_ci_failure_payload()))
# QG failure still reported.
assert mock_qg.called
# developer NOT re-enqueued at the cap.
assert not mock_enqueue.called
# Escalation message mentions CI failure.
assert mock_err.called
err_msg = " ".join(str(a) for a in mock_err.call_args[0])
assert "Max developer retries" in err_msg
assert "after CI failure" in err_msg
# Stage untouched.
assert not mock_update_stage.called