Compare commits

...

16 Commits

Author SHA1 Message Date
dev-agent
e4a9c48395 fix(deploy): gate deploy->done on deployer verdict, not LLM exit code 2026-06-04 02:43:01 +03:00
a0621b9952 Merge pull request 'fix(ci): bounce task back to developer on red CI (capped retries)' (#18) from fix/ci-fail-retry-developer into main 2026-06-04 01:41:01 +03:00
Dev Agent
3a285de11d fix(ci): bounce task back to developer on red CI (capped retries) 2026-06-04 01:39:40 +03:00
7922f6b67b Merge pull request 'fix(qg): use check_ci_green instead of local tests on development stage' (#17) from fix/drop-local-tests-qg into main 2026-06-04 01:24:14 +03:00
Dev Agent
e15d339b14 fix(qg): use check_ci_green instead of local tests on development stage 2026-06-04 01:22:43 +03:00
994f73a78e Merge pull request 'fix(qg): run pytest directly instead of make in check_tests_local' (#16) from fix/qg-pytest-no-make into main 2026-06-04 00:44:40 +03:00
orchestrator-dev
90c9ffe839 fix(qg): run pytest directly instead of make in check_tests_local 2026-06-04 00:43:04 +03:00
b6aa107f93 Merge pull request 'fix(stage): approved verdict advances analysis->architecture instead of re-running gate' (#15) from fix/approved-advances-stage into main 2026-06-03 23:31:45 +03:00
Dev Agent
0b8013cb06 fix(stage): approved verdict advances analysis->architecture instead of re-running gate 2026-06-03 23:30:08 +03:00
b01643fcc3 Merge pull request 'feat(config): external gitea_public_url for clickable doc links' (#14) from fix/gitea-public-url into main 2026-06-03 22:59:17 +03:00
Dev Agent
ca63bc26bb feat(config): external gitea_public_url for clickable doc links 2026-06-03 22:58:18 +03:00
dce9ac806b Merge pull request 'fix(pipeline): description+name to analyst, status-only analyst comment with doc links' (#13) from fix/taskmd-description into main 2026-06-03 22:45:17 +03:00
dev-agent
a9cdb17614 feat(plane): analyst comment asks for Approved status + links docs
The analyst ready-comment used the obsolete :approved: wording (comment-based approve was removed in PR #12). Rewrite it for the status-only model: ask the stakeholder to move the issue to Approved (reject = reason comment + Rejected), and add clickable Gitea links to the analyst docs that actually exist in the worktree.
2026-06-03 22:42:53 +03:00
dev-agent
96c5e6b2f9 fix(pipeline): fetch issue name from Plane API on status-trigger start
issue.updated ships only the changed fields, so name was absent and the branch slug became feature/<id>-untitled. Add fetch_issue_fields (single issue-detail GET returning name+description, reusing the endpoint/token of fetch_issue_description) and pull the name above the slug build. Empty name still falls back to untitled.
2026-06-03 22:42:53 +03:00
dev-agent
b91be74692 fix(pipeline): pass issue description to analyst task file
start_pipeline built the analyst .task.md with only the Title, so the analyst received a ~101-byte file and reported the business request as empty even though the description was already fetched. Append the resolved description to task_desc.
2026-06-03 22:42:02 +03:00
2d392b6fc7 Merge pull request 'fix: status-only verdict — remove comment-based approve + fix bug 3 (echo self-hit)' (#12) from fix/status-only-verdict into main 2026-06-03 22:20:46 +03:00
13 changed files with 854 additions and 68 deletions

View File

@@ -22,6 +22,7 @@ class Settings(BaseSettings):
# Gitea
gitea_url: str = "http://localhost:3000"
gitea_public_url: str = "" # external URL for clickable links in comments; falls back to gitea_url
gitea_token: str = ""
gitea_webhook_secret: str = ""
gitea_owner: str = "admin"

View File

@@ -197,6 +197,42 @@ def fetch_issue_description(issue_id: str, project_id: str) -> str:
return ""
def fetch_issue_fields(issue_id: str, project_id: str) -> tuple[str, str]:
"""BUG B: GET the Plane issue by UUID ONCE and return (name, description).
Plane's ``issue.updated`` webhook (e.g. a status change) only carries the
CHANGED fields, so BOTH ``name`` and ``description`` are usually absent in
the payload. start_pipeline needs the real title (for the branch slug) and
the real description (for the analyst .task.md). To avoid issuing two
separate issue-detail GETs (one for name, one for description), this single
request returns both.
Reuses the exact GET issue detail endpoint / shared token already used by
``fetch_issue_sequence_id`` / ``fetch_issue_description``. For the
description it applies the same logic as ``fetch_issue_description``
(prefer ``description_stripped``, fall back to stripping
``description_html``).
Returns ("", "") on network error, non-2xx, or missing body - never raises,
so a Plane outage degrades gracefully (caller keeps its payload fallbacks).
"""
url = f"{PLANE_BASE}/workspaces/{WORKSPACE}/projects/{project_id}/issues/{issue_id}/"
try:
resp = httpx.get(url, headers=PLANE_HEADERS, timeout=10)
resp.raise_for_status()
body = resp.json()
name = (body.get("name") or "").strip()
desc = body.get("description_stripped")
if desc and desc.strip():
description = desc
else:
description = _strip_html(body.get("description_html") or "")
return name, description
except Exception as e:
logger.warning(f"fetch_issue_fields failed for {issue_id}: {e}")
return "", ""
def find_issue_id(work_item_id: str, project_id: str = None) -> str | None:
"""Find Plane issue UUID by work_item_id (e.g. 'ET-002')."""
project_id = _resolve_project_id(work_item_id, project_id)

View File

@@ -249,9 +249,17 @@ def check_reviewer_verdict(repo: str, work_item_id: str, branch: str | None = No
def check_tests_local(repo: str, branch: str) -> tuple[bool, str]:
"""
DEPRECATED: replaced by check_ci_green on the development stage (CI is now
configured). Kept for backward-compat; not wired to any stage.
S-1 fix: run the project test suite locally and judge by exit code, instead of
depending on Gitea CI (which is not configured -> always false).
БАГ 5 fix: invoke pytest directly instead of make test. make is not installed
in the orchestrator container, so the previous ["make", "test"] call raised
FileNotFoundError. This reproduces the Makefile test target 1:1
(cd src/api && python -m pytest ../../tests/ -v).
ORCH-2 / S-4: tests run inside the per-branch worktree (ensure_worktree), so this
is safe for concurrent active tasks — no shared /repos checkout race.
"""
@@ -259,7 +267,8 @@ def check_tests_local(repo: str, branch: str) -> tuple[bool, str]:
try:
repo_path = ensure_worktree(repo, branch)
r = subprocess.run(
["make", "test"], cwd=repo_path,
["python", "-m", "pytest", "../../tests/", "-v"],
cwd=os.path.join(repo_path, "src", "api"),
capture_output=True, text=True, timeout=600,
)
if r.returncode == 0:
@@ -272,6 +281,44 @@ def check_tests_local(repo: str, branch: str) -> tuple[bool, str]:
return False, f"Local test run error: {e}"
def check_deploy_status(repo: str, work_item_id: str, branch: str | None = None) -> tuple[bool, str]:
"""
БАГ 8 fix: gate the deploy -> done transition on the deployer's machine-readable
verdict in 14-deploy-log.md frontmatter, NOT on the LLM process exit code
(which is always 0 on a successful agent session even when the deploy failed).
Mirrors check_reviewer_verdict (S-5): reads ONLY `deploy_status:` from YAML
frontmatter. Returns:
(True, ...) -> deploy_status: SUCCESS
(False, ...) -> deploy_status: FAILED, missing field, or no frontmatter
"""
import yaml
repo_path = _repo_path(repo, branch)
log_path = os.path.join(repo_path, f"docs/work-items/{work_item_id}/14-deploy-log.md")
if not os.path.isfile(log_path):
return False, "Deploy log not found (14-deploy-log.md)"
try:
with open(log_path, "r") as f:
content = f.read()
status = None
if content.startswith("---"):
parts = content.split("---", 2)
if len(parts) >= 3:
try:
fm = yaml.safe_load(parts[1]) or {}
except yaml.YAMLError as e:
return False, f"Invalid YAML frontmatter in deploy log: {e}"
status = str(fm.get("deploy_status", "")).upper().strip()
if status == "SUCCESS":
return True, "Deploy status: SUCCESS"
if status == "FAILED":
return False, "Deploy status: FAILED"
return False, f"No machine-readable deploy_status in frontmatter (got: {status!r})"
except OSError as e:
return False, f"Error reading deploy log: {e}"
# Registry for dynamic lookup by name
QG_CHECKS = {
"check_analysis_approved": check_analysis_approved,
@@ -282,4 +329,5 @@ QG_CHECKS = {
"check_tests_passed": check_tests_passed,
"check_reviewer_verdict": check_reviewer_verdict,
"check_tests_local": check_tests_local,
"check_deploy_status": check_deploy_status,
}

View File

@@ -189,36 +189,48 @@ def advance_stage(
# --- Quality gate ----------------------------------------------------
if qg_name and qg_name in QG_CHECKS:
# Human-approval gate: special analyst approved-flow (launcher only).
# Human-approval gate: split by path.
if qg_name == "check_analysis_approved":
_handle_analysis_approved_flow(
task_id, current_stage, repo, work_item_id, branch, agent, result
)
return result
# Launcher path (analyst just finished): set In Review + ask for
# the Approved status. This gate never advances on its own -- a
# human Approved verdict does that.
if agent == "analyst":
_handle_analysis_approved_flow(
task_id, current_stage, repo, work_item_id, branch, agent, result
)
return result
# Webhook Approved-verdict path (agent is None): the human flipped
# the Plane status to Approved, which IS the approval. The gate is
# satisfied -- do NOT re-run check_analysis_approved (it looks for
# an :approved: *comment* and would block on a status-only
# approval). Mark it passed and fall through to the Advance block.
result.qg_name = qg_name
result.qg_passed = True
result.qg_reason = "approved-via-status"
else:
passed, reason = _run_qg(qg_name, repo, work_item_id, branch)
result.qg_passed = passed
result.qg_reason = reason
passed, reason = _run_qg(qg_name, repo, work_item_id, branch)
result.qg_passed = passed
result.qg_reason = reason
if not passed:
logger.info(
f"Task {task_id}: QG '{qg_name}' not passed after {agent}: {reason}"
)
# Behaviour parity:
# - webhook path (finished_agent is None): emit the generic
# QG-failure notification, exactly like the old plane handler.
# - launcher path (finished_agent set): NO generic notification;
# the rollback branches below own their own messaging, exactly
# like the old launcher handler.
if agent is None:
notify_qg_failure(task_id, current_stage, qg_name, reason)
plane_notify_qg(work_item_id, current_stage, qg_name, reason)
if not passed:
logger.info(
f"Task {task_id}: QG '{qg_name}' not passed after {agent}: {reason}"
)
# Behaviour parity:
# - webhook path (finished_agent is None): emit the generic
# QG-failure notification, exactly like the old plane handler.
# - launcher path (finished_agent set): NO generic notification;
# the rollback branches below own their own messaging, exactly
# like the old launcher handler.
if agent is None:
notify_qg_failure(task_id, current_stage, qg_name, reason)
plane_notify_qg(work_item_id, current_stage, qg_name, reason)
_handle_qg_failure_rollbacks(
task_id, current_stage, repo, work_item_id, branch,
agent, qg_name, reason, result,
)
return result
_handle_qg_failure_rollbacks(
task_id, current_stage, repo, work_item_id, branch,
agent, qg_name, reason, result,
)
return result
elif qg_name:
# QG name set but not registered — do not advance (launcher behavior).
@@ -257,6 +269,58 @@ def advance_stage(
return result
def _build_analyst_ready_comment(repo: str, work_item_id: str, branch: str) -> str:
"""BUG C: HTML comment posted when analyst artifacts are ready.
Status-only model (PR #12): approval is the **Approved** status, NOT a
``:approved:`` comment and NOT moving back to In Progress. The comment asks
the stakeholder to flip the status and links the documents the analyst
actually produced.
Links point at the Gitea web view:
{gitea_url}/{owner}/{repo}/src/branch/{branch}/docs/work-items/{wid}/<file>
Only files that REALLY exist in the worktree are listed (no invented docs).
"""
text = (
"\u2705 BRD/\u0422\u0417/AC \u0433\u043e\u0442\u043e\u0432\u044b. "
"\u0414\u043b\u044f \u043f\u0440\u043e\u0434\u0432\u0438\u0436\u0435\u043d\u0438\u044f "
"\u043f\u0435\u0440\u0435\u0432\u0435\u0434\u0438\u0442\u0435 \u0437\u0430\u0434\u0430\u0447\u0443 "
"\u0432 \u0441\u0442\u0430\u0442\u0443\u0441 Approved. "
"\u0414\u043b\u044f \u043e\u0442\u043a\u043b\u043e\u043d\u0435\u043d\u0438\u044f \u2014 "
"\u043d\u0430\u043f\u0438\u0448\u0438\u0442\u0435 \u043f\u0440\u0438\u0447\u0438\u043d\u0443 "
"\u043a\u043e\u043c\u043c\u0435\u043d\u0442\u043e\u043c \u0438 \u043f\u0435\u0440\u0435\u0432\u0435\u0434\u0438\u0442\u0435 "
"\u0432 Rejected."
)
# Candidate analyst artifacts (label -> filename). Only existing ones linked.
candidates = [
("Business request", "00-business-request.md"),
("BRD", "01-brd.md"),
("\u0422\u0417 (TRZ)", "02-trz.md"),
("Acceptance Criteria", "03-acceptance-criteria.md"),
("Test Plan", "04-test-plan.yaml"),
("UI Test Cases", "04b-ui-test-cases.md"),
]
rel_dir = f"docs/work-items/{work_item_id}"
try:
wt_dir = os.path.join(get_worktree_path(repo, branch), rel_dir)
except Exception:
wt_dir = None
owner = getattr(settings, "gitea_owner", "admin")
base = (getattr(settings, "gitea_public_url", "") or settings.gitea_url).rstrip("/")
links = []
for label, fname in candidates:
if wt_dir and not os.path.isfile(os.path.join(wt_dir, fname)):
continue
href = f"{base}/{owner}/{repo}/src/branch/{branch}/{rel_dir}/{fname}"
links.append(f'<li><a href="{href}">{label}</a></li>')
if links:
text += "<br><b>\u0414\u043e\u043a\u0443\u043c\u0435\u043d\u0442\u044b:</b><ul>" + "".join(links) + "</ul>"
return text
def _handle_analysis_approved_flow(
task_id, current_stage, repo, work_item_id, branch, agent, result: AdvanceResult
):
@@ -279,19 +343,17 @@ def _handle_analysis_approved_flow(
files_ok, _ = files_check(repo, work_item_id, branch)
if files_ok:
# Full artifacts ready -> In Review, ask for :approved:.
# Full artifacts ready -> In Review, ask for the Approved STATUS (BUG C).
set_issue_in_review(work_item_id)
plane_add_comment(
work_item_id,
"\U0001f4cb BRD/\u0422\u0417/AC/TestPlan \u0433\u043e\u0442\u043e\u0432\u044b. "
"\u041f\u0440\u043e\u0448\u0443 review \u0438 \u0440\u0435\u0430\u043a\u0446\u0438\u044e :approved: "
"\u0434\u043b\u044f \u043f\u0440\u043e\u0434\u0432\u0438\u0436\u0435\u043d\u0438\u044f \u0432 Architecture.",
_build_analyst_ready_comment(repo, work_item_id, branch),
author="analyst",
)
notify_approve_requested(task_id)
result.note = "analysis-in-review"
logger.info(
f"Task {task_id}: analyst finished, requested :approved: in Plane"
f"Task {task_id}: analyst finished, requested Approved status in Plane"
)
return
@@ -428,3 +490,31 @@ def _handle_qg_failure_rollbacks(
f"Task {task_id}: architect conflict, enqueued analyst "
f"(job_id={new_job})"
)
# БАГ 8: deployer verdict FAILED -> roll deploy back to development.
# The launcher's exit_code-based guard (launcher.py:475) never fires because
# the LLM process exit code is always 0; this gate fires on the machine-readable
# deploy_status verdict in 14-deploy-log.md instead. Mirrors the launcher block
# (rollback + set_issue_blocked + notify) but is driven by the VERDICT.
if agent == "deployer" and qg_name == "check_deploy_status":
update_task_stage(task_id, "development")
notify_stage_change(task_id, current_stage, "development")
plane_notify_stage(work_item_id, current_stage, "development")
result.rolled_back_to = "development"
set_issue_blocked(work_item_id)
notify_qg_failure(task_id, "deploy", "check_deploy_status", reason)
plane_add_comment(
work_item_id,
f"\u274c Deploy FAILED ({reason}). Rolled back to development. "
f"Developer \u043d\u0443\u0436\u0435\u043d \u0434\u043b\u044f \u0444\u0438\u043a\u0441\u0430.",
author="deployer",
)
send_telegram(
f"\U0001f6a8 {work_item_id}: Deploy FAILED ({reason}). "
f"Rolled back to development. Needs fix."
)
result.alerted = True
logger.error(
f"Task {task_id}: deployer verdict FAILED, rolled back deploy -> "
f"development ({reason})"
)

View File

@@ -13,10 +13,10 @@ STAGE_TRANSITIONS = {
"created": {"next": "analysis", "agent": "analyst", "qg": None},
"analysis": {"next": "architecture", "agent": "architect", "qg": "check_analysis_approved"},
"architecture": {"next": "development", "agent": "developer", "qg": "check_architecture_done"},
"development": {"next": "review", "agent": "reviewer", "qg": "check_tests_local"},
"development": {"next": "review", "agent": "reviewer", "qg": "check_ci_green"},
"review": {"next": "testing", "agent": "tester", "qg": "check_reviewer_verdict"},
"testing": {"next": "deploy", "agent": "deployer", "qg": "check_tests_passed"},
"deploy": {"next": "done", "agent": None, "qg": None},
"deploy": {"next": "done", "agent": None, "qg": "check_deploy_status"},
"done": {"next": None, "agent": None, "qg": None},
}

View File

@@ -216,12 +216,31 @@ async def handle_ci_status(payload: dict):
else:
notify_qg_failure(task_id, current_stage, "check_ci_green", reason)
elif state == "failure":
# S-1: Gitea CI is NOT the authoritative gate anymore (the orchestrator runs
# tests locally via check_tests_local). Gitea CI is often unconfigured, so a
# "failure"/empty status here is not actionable. Log only, do not alert.
logger.debug(f"Task {task_id}: Gitea CI state='failure' on branch '{branch}' "
f"(non-authoritative, suppressed — local tests are the gate)")
elif state == "failure" and current_stage == "development":
# CI is the authoritative gate for development -> review.
# On red CI: notify, then bounce the task back to the developer (capped retries),
# symmetric to the review REQUEST_CHANGES path.
notify_qg_failure(task_id, current_stage, "check_ci_green", f"Gitea CI failed on branch '{branch}'")
conn = get_db()
retry_count = conn.execute(
"SELECT COUNT(*) as cnt FROM agent_runs WHERE task_id = ? AND agent = 'developer'",
(task_id,),
).fetchone()["cnt"]
conn.close()
if retry_count < MAX_DEV_RETRIES:
# task already on 'development' — no stage change needed, just relaunch developer
try:
task_desc = (
f"Work item: {work_item_id}\nRepo: {repo_name}\nBranch: {branch}\n"
f"Stage: development\nNote: CI failed, fix and re-push (attempt {retry_count + 1}/{MAX_DEV_RETRIES})"
)
job_id = enqueue_job("developer", repo_name, task_desc, task_id=task_id)
logger.info(f"Task {task_id}: CI failed, enqueued developer (attempt {retry_count + 1}, job_id={job_id})")
except Exception as e:
notify_error(task_id, f"Failed to relaunch developer after CI failure: {e}")
else:
notify_error(task_id, f"Max developer retries ({MAX_DEV_RETRIES}) reached after CI failure, escalating")
logger.error(f"Task {task_id}: max retries reached after CI failure, needs manual intervention")
async def handle_pr(payload: dict):

View File

@@ -387,22 +387,35 @@ async def start_pipeline(data: dict, project_id: str = ""):
repo = proj.repo
plane_project_id = proj.plane_project_id
# BUG 1: Plane's issue.updated webhook (status change -> In Progress) sends
# only the CHANGED fields, so description / description_stripped are usually
# empty here even though the issue HAS a description. If the payload's
# description is missing/too short, pull the full one from the Plane issue
# detail API (same GET endpoint + shared token already used by
# fetch_issue_sequence_id) before QG-0 runs. If the API is also empty, QG-0
# legitimately fails (truly empty ticket).
if not description or len(description.strip()) < 20:
from ..plane_sync import fetch_issue_description
fetched = fetch_issue_description(plane_id, plane_project_id)
if fetched and len(fetched.strip()) >= len(description.strip()):
description = fetched
# BUG 1 + BUG B: Plane's issue.updated webhook (status change -> In Progress)
# sends only the CHANGED fields, so BOTH description / description_stripped
# AND name are usually empty here even though the issue HAS them. Pull the
# full title + description from the Plane issue detail API in a SINGLE GET
# (fetch_issue_fields: same endpoint + shared token already used by
# fetch_issue_sequence_id) before QG-0 and before the branch slug is built.
# If the API is also empty, QG-0 legitimately fails (truly empty ticket) and
# name falls back to "untitled".
name_missing = (not name) or name.strip().lower() == "untitled" or len(name.strip()) < 3
desc_missing = (not description) or len(description.strip()) < 20
if name_missing or desc_missing:
from ..plane_sync import fetch_issue_fields
fetched_name, fetched_desc = fetch_issue_fields(plane_id, plane_project_id)
if desc_missing and fetched_desc and len(fetched_desc.strip()) >= len(description.strip()):
description = fetched_desc
logger.info(
f"start_pipeline: pulled description from Plane API for {plane_id} "
f"({len(description.strip())} chars)"
)
if name_missing and fetched_name and len(fetched_name.strip()) >= 3:
name = fetched_name
logger.info(
f"start_pipeline: pulled name from Plane API for {plane_id} "
f"('{name}')"
)
# BUG B fallback: if name is still empty/blank after the API pull, keep the
# legacy "untitled" so the slug/branch build never crashes on an empty name.
if not name or not name.strip():
name = "untitled"
# QG-0 validation (hard gate on pipeline start)
errors = _qg0_errors(name, description)
@@ -509,7 +522,10 @@ async def start_pipeline(data: dict, project_id: str = ""):
task_row = get_db().execute("SELECT id FROM tasks WHERE work_item_id=?", (work_item_id,)).fetchone()
if task_row:
task_id = task_row[0]
task_desc = f"Work item: {work_item_id}\nRepo: {repo}\nBranch: {branch}\nStage: analysis\nTitle: {name}"
task_desc = (
f"Work item: {work_item_id}\nRepo: {repo}\nBranch: {branch}\n"
f"Stage: analysis\nTitle: {name}\n\nDescription:\n{description}"
)
job_id = enqueue_job("analyst", repo, task_desc, task_id=task_id)
logger.info(f"Task {task_id}: enqueued analyst (job_id={job_id})")
# Post start comment to Plane

View File

@@ -0,0 +1,74 @@
"""BUG C: analyst "artifacts ready" comment under the status-only model.
The comment must ask for the **Approved** status (not the obsolete
":approved:" reaction, not moving back to "In Progress") and link only the
docs that actually exist in the worktree.
"""
import os
import tempfile
os.environ.setdefault("ORCH_GITEA_TOKEN", "test-token")
os.environ.setdefault("ORCH_PLANE_API_TOKEN", "test-token")
def test_analyst_comment_asks_approved_with_links(monkeypatch, tmp_path):
from src import stage_engine as SE
# Worktree with only SOME of the candidate docs present.
wt = tmp_path / "wt"
docs = wt / "docs" / "work-items" / "ET-011"
docs.mkdir(parents=True)
for fname in ("00-business-request.md", "01-brd.md", "02-trz.md",
"03-acceptance-criteria.md", "04-test-plan.yaml"):
(docs / fname).write_text("x")
# 04b-ui-test-cases.md intentionally absent -> must NOT be linked
monkeypatch.setattr(SE, "get_worktree_path", lambda repo, branch: str(wt))
# public URL set -> links must be built from it (not gitea_url)
monkeypatch.setattr(SE.settings, "gitea_url", "http://localhost:3000")
monkeypatch.setattr(SE.settings, "gitea_public_url", "https://git.mva154.duckdns.org")
monkeypatch.setattr(SE.settings, "gitea_owner", "admin")
html = SE._build_analyst_ready_comment(
"enduro-trails", "ET-011", "feature/ET-011-gpx-upload-feature"
)
# text asks for the Approved STATUS, not the obsolete mechanisms
assert "Approved" in html
assert ":approved:" not in html
assert "In Progress" not in html
assert "Rejected" in html
# clickable links to docs that ACTUALLY exist
assert "<a href=" in html
base = ("https://git.mva154.duckdns.org/admin/enduro-trails/src/branch/"
"feature/ET-011-gpx-upload-feature/docs/work-items/ET-011/")
assert base + "01-brd.md" in html
assert base + "04-test-plan.yaml" in html
# the missing file is NOT invented
assert "04b-ui-test-cases.md" not in html
# internal git url must NOT appear in clickable links
assert "localhost:3000" not in html
def test_analyst_comment_falls_back_to_gitea_url(monkeypatch, tmp_path):
"""When gitea_public_url is empty, links fall back to gitea_url."""
from src import stage_engine as SE
wt = tmp_path / "wt"
docs = wt / "docs" / "work-items" / "ET-011"
docs.mkdir(parents=True)
(docs / "01-brd.md").write_text("x")
monkeypatch.setattr(SE, "get_worktree_path", lambda repo, branch: str(wt))
monkeypatch.setattr(SE.settings, "gitea_url", "http://localhost:3000")
monkeypatch.setattr(SE.settings, "gitea_public_url", "")
monkeypatch.setattr(SE.settings, "gitea_owner", "admin")
html = SE._build_analyst_ready_comment(
"enduro-trails", "ET-011", "feature/ET-011-gpx-upload-feature"
)
base = ("http://localhost:3000/admin/enduro-trails/src/branch/"
"feature/ET-011-gpx-upload-feature/docs/work-items/ET-011/")
assert base + "01-brd.md" in html

View File

@@ -109,17 +109,19 @@ def _to_in_progress_no_desc(plane_id="bug1"):
@patch("src.webhooks.plane._create_initial_docs", new_callable=AsyncMock)
@patch("src.webhooks.plane._create_gitea_branch", new_callable=AsyncMock)
@patch("src.plane_sync.fetch_issue_sequence_id", return_value=42)
@patch("src.plane_sync.fetch_issue_description",
return_value="This is a sufficiently long description fetched from Plane API.")
@patch("src.plane_sync.fetch_issue_fields",
return_value=("A valid backlog item title",
"This is a sufficiently long description fetched from Plane API."))
def test_status_start_fetches_description(
mock_desc, mock_seq, mock_branch, mock_docs, mock_enqueue
mock_fields, mock_seq, mock_branch, mock_docs, mock_enqueue
):
"""BUG 1: empty description in payload -> start_pipeline pulls it from the
Plane API -> QG-0 passes -> task created + analyst enqueued (NOT blocked)."""
Plane API (single fetch_issue_fields GET) -> QG-0 passes -> task created +
analyst enqueued (NOT blocked)."""
resp = _to_in_progress_no_desc("bug1")
assert resp.status_code == 200
# description was pulled from the API
mock_desc.assert_called_once()
# name + description were pulled from the API in one call
mock_fields.assert_called_once()
# QG-0 passed -> task created and analyst launched (NOT set_issue_blocked)
assert _count("bug1") == 1
assert _task("bug1")["stage"] == "analysis"
@@ -131,15 +133,15 @@ def test_status_start_fetches_description(
@patch("src.webhooks.plane._create_initial_docs", new_callable=AsyncMock)
@patch("src.webhooks.plane._create_gitea_branch", new_callable=AsyncMock)
@patch("src.plane_sync.fetch_issue_sequence_id", return_value=42)
@patch("src.plane_sync.fetch_issue_description", return_value="")
@patch("src.plane_sync.fetch_issue_fields", return_value=("", ""))
def test_status_start_empty_api_still_blocks(
mock_desc, mock_seq, mock_branch, mock_docs, mock_enqueue
mock_fields, mock_seq, mock_branch, mock_docs, mock_enqueue
):
"""BUG 1 negative path: if the API also returns empty, QG-0 legitimately
fails -> NO task is created (truly empty ticket)."""
resp = _to_in_progress_no_desc("bug1-empty")
assert resp.status_code == 200
mock_desc.assert_called_once()
mock_fields.assert_called_once()
assert _count("bug1-empty") == 0
mock_enqueue.assert_not_called()
@@ -168,10 +170,11 @@ def test_work_item_id_uniqueness():
@patch("src.webhooks.plane._create_initial_docs", new_callable=AsyncMock)
@patch("src.webhooks.plane._create_gitea_branch", new_callable=AsyncMock)
@patch("src.plane_sync.fetch_issue_sequence_id", return_value=6)
@patch("src.plane_sync.fetch_issue_description",
return_value="A sufficiently long description for QG-0 to pass cleanly.")
@patch("src.plane_sync.fetch_issue_fields",
return_value=("Popup enduro trails feature",
"A sufficiently long description for QG-0 to pass cleanly."))
def test_collision_reassigns_in_start_pipeline(
mock_desc, mock_seq, mock_branch, mock_docs, mock_enqueue
mock_fields, mock_seq, mock_branch, mock_docs, mock_enqueue
):
"""BUG 2a end-to-end: ET-006 already exists -> a new In Progress issue whose
Plane sequence_id is also 6 must NOT reuse ET-006."""

View File

@@ -17,7 +17,10 @@ from src.qg.checks import (
check_ci_green,
check_review_approved,
check_tests_passed,
check_tests_local,
check_deploy_status,
)
from src.stages import get_qg_for_stage
@pytest.fixture(autouse=True)
@@ -186,3 +189,116 @@ class TestCheckTestsPassed:
passed, reason = check_tests_passed("enduro-trails", "ET-001")
assert passed is False
assert "not found" in reason.lower()
class TestCheckDeployStatus:
"""BUG 8: deploy -> done must be gated on the deployer's machine-readable
deploy_status verdict in 14-deploy-log.md frontmatter, NOT the LLM exit code
(always 0). Mirrors check_reviewer_verdict (reads ONLY the frontmatter field)."""
def _write_log(self, repo_dir, content):
wi_dir = repo_dir / "docs" / "work-items" / "ET-011"
wi_dir.mkdir(parents=True)
(wi_dir / "14-deploy-log.md").write_text(content)
def test_success_verdict_passes(self, setup_work_item_dir):
self._write_log(
setup_work_item_dir,
"---\ndeploy_status: SUCCESS\nversion: v0.0.3\n---\n\nDeployed OK.\n",
)
passed, reason = check_deploy_status("enduro-trails", "ET-011")
assert passed is True
assert "SUCCESS" in reason
def test_failed_verdict_fails(self, setup_work_item_dir):
self._write_log(
setup_work_item_dir,
"---\ndeploy_status: FAILED\nversion: v0.0.3\n---\n\npermission denied.\n",
)
passed, reason = check_deploy_status("enduro-trails", "ET-011")
assert passed is False
assert "FAILED" in reason
def test_no_file_fails(self, setup_work_item_dir):
passed, reason = check_deploy_status("enduro-trails", "ET-011")
assert passed is False
assert "not found" in reason.lower()
def test_no_field_fails(self, setup_work_item_dir):
# Frontmatter present but no deploy_status field -> must NOT pass.
self._write_log(
setup_work_item_dir,
"---\nversion: v0.0.3\n---\n\nStatus: FAILED (prose only).\n",
)
passed, reason = check_deploy_status("enduro-trails", "ET-011")
assert passed is False
def test_prose_only_no_frontmatter_fails(self, setup_work_item_dir):
# Prose mentioning SUCCESS but no machine-readable frontmatter -> fail.
self._write_log(
setup_work_item_dir,
"# Deploy log\n\nStatus: SUCCESS (prose, not frontmatter).\n",
)
passed, reason = check_deploy_status("enduro-trails", "ET-011")
assert passed is False
def test_deploy_stage_qg_is_check_deploy_status(self):
assert get_qg_for_stage("deploy") == "check_deploy_status"
def test_registered_in_qg_checks(self):
from src.qg.checks import QG_CHECKS
assert QG_CHECKS.get("check_deploy_status") is check_deploy_status
class TestDevelopmentStageQG:
"""BUG 6: development stage QG is now check_ci_green (CI is the authoritative
gate), not the deprecated check_tests_local."""
def test_development_qg_is_check_ci_green(self):
assert get_qg_for_stage("development") == "check_ci_green"
def test_check_tests_local_is_deprecated_and_unwired(self):
# Kept in the registry for backward-compat, but not wired to any stage.
from src.qg.checks import QG_CHECKS
from src.stages import STAGE_TRANSITIONS
assert "check_tests_local" in QG_CHECKS
wired = {t.get("qg") for t in STAGE_TRANSITIONS.values()}
assert "check_tests_local" not in wired
class TestCheckTestsLocal:
"""BUG 5: check_tests_local must run pytest directly (not make, which is
not installed in the orchestrator container)."""
@patch("src.qg.checks.ensure_worktree")
@patch("subprocess.run")
def test_passes_on_returncode_zero(self, mock_run, mock_wt, tmp_path):
mock_wt.return_value = str(tmp_path)
mock_run.return_value = MagicMock(returncode=0, stdout="ok", stderr="")
passed, reason = check_tests_local("enduro-trails", "feature/ET-001-x")
assert passed is True
assert reason == "Local tests passed"
@patch("src.qg.checks.ensure_worktree")
@patch("subprocess.run")
def test_fails_on_nonzero_returncode(self, mock_run, mock_wt, tmp_path):
mock_wt.return_value = str(tmp_path)
mock_run.return_value = MagicMock(returncode=1, stdout="boom", stderr="trace")
passed, reason = check_tests_local("enduro-trails", "feature/ET-001-x")
assert passed is False
assert "Local tests failed" in reason
@patch("src.qg.checks.ensure_worktree")
@patch("subprocess.run")
def test_invokes_pytest_not_make(self, mock_run, mock_wt, tmp_path):
"""The subprocess call must be pytest, from src/api, against ../../tests/."""
mock_wt.return_value = str(tmp_path)
mock_run.return_value = MagicMock(returncode=0, stdout="", stderr="")
check_tests_local("enduro-trails", "feature/ET-001-x")
args, kwargs = mock_run.call_args
cmd = args[0]
assert "make" not in cmd
assert cmd[:3] == ["python", "-m", "pytest"]
assert "../../tests/" in cmd
assert kwargs["cwd"] == os.path.join(str(tmp_path), "src", "api")

View File

@@ -203,10 +203,13 @@ class TestQgFailureDoesNotAdvance:
assert _jobs() == []
def test_webhook_path_emits_qg_failure_notification(self, monkeypatch):
"""finished_agent=None -> generic QG-failure notification fires (plane parity)."""
"""finished_agent=None -> generic QG-failure notification fires (plane parity).
development stage QG is now check_ci_green (was check_tests_local).
"""
monkeypatch.setattr(
stage_engine, "QG_CHECKS",
{**stage_engine.QG_CHECKS, "check_tests_local": _fail("ci red")},
{**stage_engine.QG_CHECKS, "check_ci_green": _fail("ci red")},
)
task_id = _make_task("development")
advance_stage(task_id, "development", "enduro-trails", "ET-001",
@@ -297,6 +300,59 @@ class TestTesterFail:
assert _jobs() == []
# ---------------------------------------------------------------------------
# BUG 8: deploy verdict gates deploy -> done (not the LLM exit code)
# ---------------------------------------------------------------------------
class TestDeployVerdict:
"""deploy -> done must be gated on check_deploy_status (the deployer's
machine-readable verdict), NOT on the LLM exit code (always 0)."""
def test_failed_verdict_rolls_back_to_development(self, monkeypatch):
# deployer finished (exit_code 0 from launcher), but verdict is FAILED.
monkeypatch.setattr(
stage_engine, "QG_CHECKS",
{**stage_engine.QG_CHECKS,
"check_deploy_status": _fail("Deploy status: FAILED")},
)
task_id = _make_task("deploy")
res = advance_stage(task_id, "deploy", "enduro-trails", "ET-011",
"feature/ET-011-x", finished_agent="deployer")
assert res.advanced is False
assert res.rolled_back_to == "development"
assert _stage(task_id) == "development" # NOT done
assert res.alerted is True
assert stage_engine.set_issue_blocked.called
assert stage_engine.send_telegram.called
def test_no_deploy_log_rolls_back(self, monkeypatch):
# No frontmatter field / no file -> check returns False -> rollback.
monkeypatch.setattr(
stage_engine, "QG_CHECKS",
{**stage_engine.QG_CHECKS,
"check_deploy_status": _fail("Deploy log not found (14-deploy-log.md)")},
)
task_id = _make_task("deploy")
res = advance_stage(task_id, "deploy", "enduro-trails", "ET-011",
"feature/ET-011-x", finished_agent="deployer")
assert res.advanced is False
assert _stage(task_id) == "development"
def test_success_verdict_advances_to_done(self, monkeypatch):
monkeypatch.setattr(
stage_engine, "QG_CHECKS",
{**stage_engine.QG_CHECKS,
"check_deploy_status": _pass},
)
task_id = _make_task("deploy")
res = advance_stage(task_id, "deploy", "enduro-trails", "ET-011",
"feature/ET-011-x", finished_agent="deployer")
assert res.advanced is True
assert res.to_stage == "done"
assert _stage(task_id) == "done"
assert res.enqueued_agent is None # no agent leaves deploy
assert _jobs() == []
# ---------------------------------------------------------------------------
# Architect conflict -> rollback to analysis + enqueue analyst
# ---------------------------------------------------------------------------
@@ -358,6 +414,63 @@ class TestAnalysisApprovedFlow:
assert stage_engine.notify_approve_requested.called
assert _jobs() == []
def test_approved_verdict_advances_analysis_to_architecture(self, monkeypatch):
"""BUG 4: a human Approved STATUS (webhook path, finished_agent=None)
must satisfy the analysis gate and advance analysis -> architecture,
enqueuing the architect. The status-only approval must NOT re-run
check_analysis_approved (which looks for an :approved: COMMENT and would
otherwise wrongly block the advance).
"""
# Make check_analysis_approved FAIL if it is ever called: the webhook
# path must bypass it entirely (status == approval). If the engine were
# to re-run the gate, this would block the advance and fail the test.
monkeypatch.setattr(
stage_engine, "QG_CHECKS",
{
**stage_engine.QG_CHECKS,
"check_analysis_approved": _fail("no :approved: comment"),
},
)
# Guard: the approval-flow (launcher-only) must NOT be invoked here.
flow = MagicMock()
monkeypatch.setattr(stage_engine, "_handle_analysis_approved_flow", flow)
task_id = _make_task("analysis")
res = advance_stage(
task_id, "analysis", "enduro-trails", "ET-001",
"feature/ET-001-x", finished_agent=None,
)
assert res.advanced is True
assert res.to_stage == "architecture"
assert _stage(task_id) == "architecture"
assert res.enqueued_agent == "architect"
# Sanity: agent for analysis is architect, never analyst (no re-run loop).
assert get_agent_for_stage("analysis") == "architect"
jobs = _jobs()
assert len(jobs) == 1
assert jobs[0]["agent"] == "architect"
# The launcher-only approval-flow was NOT called on the webhook path.
flow.assert_not_called()
def test_launcher_path_does_not_advance_and_calls_flow(self, monkeypatch):
"""Regression: the launcher path (finished_agent='analyst') still routes
into _handle_analysis_approved_flow and does NOT advance.
"""
flow = MagicMock()
monkeypatch.setattr(stage_engine, "_handle_analysis_approved_flow", flow)
task_id = _make_task("analysis")
res = advance_stage(
task_id, "analysis", "enduro-trails", "ET-001",
"feature/ET-001-x", finished_agent="analyst",
)
assert res.advanced is not True
assert _stage(task_id) == "analysis"
assert _jobs() == []
flow.assert_called_once()
# ---------------------------------------------------------------------------
# launcher + plane both delegate to the engine

View File

@@ -0,0 +1,138 @@
"""Tests for fix/taskmd-description (3 bugs at the analyst pipeline entry/exit):
BUG A: start_pipeline built the analyst .task.md WITHOUT the description body
(only Title), so analyst received a ~101-byte file and reported the
"business request is empty". task_desc must now carry the description.
BUG B: issue.updated ships only changed fields, so `name` is usually absent ->
slug/branch became "untitled". start_pipeline must pull the real name
from the Plane API (single fetch_issue_fields GET, above the slug build)
so the branch slug is NOT "untitled".
BUG C: the analyst "artifacts ready" comment used the obsolete ":approved:"
wording. Under the status-only model it must ask for the **Approved**
status (not ":approved:", not "In Progress") and link the docs that
actually exist.
"""
import os
import tempfile
_test_db = os.path.join(tempfile.gettempdir(), "test_orchestrator_taskmd_desc.db")
os.environ["ORCH_DB_PATH"] = _test_db
os.environ.setdefault("ORCH_PLANE_WEBHOOK_SECRET", "")
os.environ.setdefault("ORCH_GITEA_TOKEN", "test-token")
os.environ.setdefault("ORCH_PLANE_API_TOKEN", "test-token")
import pytest # noqa: E402
from unittest.mock import patch, AsyncMock # noqa: E402
from fastapi.testclient import TestClient # noqa: E402
from src.main import app # noqa: E402
from src.db import init_db, get_db # noqa: E402
from src import projects as P # noqa: E402
from src.projects import reload_projects # noqa: E402
ENDURO_PLANE_ID = "7a79f0a9-5278-49cd-9007-9a338f238f9c"
IN_PROGRESS = "b873d9eb-993c-48cd-97ac-99a9b1623967"
BACKLOG = "113b24f6-cce8-4be9-9a22-a359b9cf0122"
client = TestClient(app)
@pytest.fixture(autouse=True)
def setup(monkeypatch):
monkeypatch.setattr(P.settings, "db_path", _test_db)
import src.db as _db
monkeypatch.setattr(_db.settings, "db_path", _test_db)
if os.path.exists(_test_db):
os.unlink(_test_db)
init_db()
monkeypatch.setattr("src.webhooks.plane.verify_plane_signature", lambda body, sig: True)
registry_json = (
f'[{{"plane_project_id": "{ENDURO_PLANE_ID}", "repo": "enduro-trails",'
f' "work_item_prefix": "ET", "name": "enduro-trails"}}]'
)
monkeypatch.setattr(P.settings, "projects_json", registry_json)
reload_projects()
yield
reload_projects()
if os.path.exists(_test_db):
os.unlink(_test_db)
def _task(plane_id):
conn = get_db()
row = conn.execute("SELECT * FROM tasks WHERE plane_id=?", (plane_id,)).fetchone()
conn.close()
return row
# --------------------------------------------------------------------------- #
# BUG A: description reaches the analyst .task.md
# --------------------------------------------------------------------------- #
@patch("src.webhooks.plane.enqueue_job", return_value=1)
@patch("src.webhooks.plane._create_initial_docs", new_callable=AsyncMock)
@patch("src.webhooks.plane._create_gitea_branch", new_callable=AsyncMock)
@patch("src.plane_sync.fetch_issue_sequence_id", return_value=11)
@patch("src.plane_sync.fetch_issue_fields",
return_value=("ET-011 real title",
"REAL BUSINESS REQUEST BODY: user wants GPX upload with "
"validation and a results map."))
def test_taskdesc_includes_description(
mock_fields, mock_seq, mock_branch, mock_docs, mock_enqueue
):
resp = client.post("/webhook/plane", json={
"event": "issue", "action": "updated",
"data": {
"id": "taskA",
# status change payload: NO name, NO description (only changed field)
"project": ENDURO_PLANE_ID,
"state": {"id": IN_PROGRESS, "name": "In Progress", "group": "started"},
},
"activity": {"field": "state", "new_value": IN_PROGRESS, "old_value": BACKLOG},
})
assert resp.status_code == 200
mock_enqueue.assert_called_once()
# task_desc is the 3rd positional arg of enqueue_job(agent, repo, task_desc, ...)
task_desc = mock_enqueue.call_args.args[2]
assert "Description:" in task_desc
# the actual description body (not just the Title) is in the file
assert "REAL BUSINESS REQUEST BODY" in task_desc
assert "results map" in task_desc
# --------------------------------------------------------------------------- #
# BUG B: name fetched from Plane API when payload is empty -> slug not untitled
# --------------------------------------------------------------------------- #
@patch("src.webhooks.plane.enqueue_job", return_value=1)
@patch("src.webhooks.plane._create_initial_docs", new_callable=AsyncMock)
@patch("src.webhooks.plane._create_gitea_branch", new_callable=AsyncMock)
@patch("src.plane_sync.fetch_issue_sequence_id", return_value=11)
@patch("src.plane_sync.fetch_issue_fields",
return_value=("GPX upload feature",
"A sufficiently long description so QG-0 passes cleanly."))
def test_name_fetched_when_payload_empty(
mock_fields, mock_seq, mock_branch, mock_docs, mock_enqueue
):
resp = client.post("/webhook/plane", json={
"event": "issue", "action": "updated",
"data": {
"id": "taskB",
# NO name, NO description in the payload (Plane status-change shape)
"project": ENDURO_PLANE_ID,
"state": {"id": IN_PROGRESS, "name": "In Progress", "group": "started"},
},
"activity": {"field": "state", "new_value": IN_PROGRESS, "old_value": BACKLOG},
})
assert resp.status_code == 200
mock_fields.assert_called_once()
row = _task("taskB")
assert row is not None
branch = row["branch"]
# slug derived from the fetched name -> "gpx-upload-feature", NOT untitled
assert "untitled" not in branch
assert "gpx-upload-feature" in branch
# Title in the analyst task file is the fetched name, not "untitled"
task_desc = mock_enqueue.call_args.args[2]
assert "Title: GPX upload feature" in task_desc

View File

@@ -1,4 +1,5 @@
import pytest
import asyncio
import os
import tempfile
from unittest.mock import patch, MagicMock, AsyncMock
@@ -272,6 +273,46 @@ def test_gitea_ci_success_advances_to_review(mock_launcher, mock_ci):
assert task["stage"] == "review"
@patch("src.webhooks.gitea.notify_qg_failure")
@patch("src.webhooks.gitea.launcher")
def test_gitea_ci_failure_on_development_notifies_qg_failure(mock_launcher, mock_notify):
"""BUG 6: CI failure at development is now the authoritative QG gate failing.
It must notify QG failure (not silently suppress) and must NOT advance the stage.
"""
conn = get_db()
conn.execute(
"INSERT INTO tasks (plane_id, work_item_id, repo, branch, stage) VALUES (?, ?, ?, ?, ?)",
("ci-fail-001", "ET-011", "enduro-trails", "feature/ET-011-test", "development"),
)
conn.commit()
conn.close()
resp = client.post(
"/webhook/gitea",
json={
"state": "failure",
"branches": [{"name": "feature/ET-011-test"}],
"repository": {"name": "enduro-trails"},
},
headers={"X-Gitea-Event": "status"},
)
assert resp.status_code == 200
# QG failure was reported for the development stage with check_ci_green.
assert mock_notify.called
args, kwargs = mock_notify.call_args
call = list(args) + list(kwargs.values())
assert "development" in call
assert "check_ci_green" in call
# Stage did NOT advance.
conn = get_db()
task = conn.execute("SELECT * FROM tasks WHERE plane_id = 'ci-fail-001'").fetchone()
conn.close()
assert task["stage"] == "development"
def test_gitea_webhook_pr():
"""PR event is accepted."""
resp = client.post(
@@ -301,3 +342,94 @@ def test_plane_webhook_event_logged():
conn.close()
assert event is not None
assert event["source"] == "plane"
# ---------------------------------------------------------------------------
# BUG 7: red CI on development must bounce the task back to the developer
# (capped retries, symmetric to review REQUEST_CHANGES). These are pure-logic
# tests: they invoke handle_ci_status() directly with mocked helpers so they do
# not pass through the TestClient HMAC barrier (baseline 401s are off-limits).
# ---------------------------------------------------------------------------
def _ci_failure_payload():
return {
"state": "failure",
"branches": [{"name": "feature/ET-011-test"}],
"repository": {"name": "enduro-trails"},
}
def _mock_db_with_retry_count(count):
"""Build a get_db() mock whose retry_count query returns `count`."""
conn = MagicMock()
conn.execute.return_value.fetchone.return_value = {"cnt": count}
return conn
@patch("src.webhooks.gitea.notify_error")
@patch("src.webhooks.gitea.notify_qg_failure")
@patch("src.webhooks.gitea.enqueue_job")
@patch("src.webhooks.gitea.update_task_stage")
@patch("src.webhooks.gitea.get_db")
@patch("src.webhooks.gitea.get_task_by_repo_branch")
@patch("src.webhooks.gitea.get_project_by_repo")
def test_ci_failure_development_retries_developer_under_limit(
mock_proj, mock_task, mock_get_db, mock_update_stage,
mock_enqueue, mock_qg, mock_err,
):
"""retry_count < MAX_DEV_RETRIES → relaunch developer, stage untouched."""
from src.webhooks.gitea import handle_ci_status
mock_proj.return_value = {"repo": "enduro-trails"}
mock_task.return_value = {
"id": 1, "stage": "development", "work_item_id": "ET-011",
}
mock_get_db.return_value = _mock_db_with_retry_count(0)
mock_enqueue.return_value = 42
asyncio.run(handle_ci_status(_ci_failure_payload()))
# QG failure was still reported (Slava sees both the failure and the retry).
assert mock_qg.called
# developer was re-enqueued.
assert mock_enqueue.called
assert mock_enqueue.call_args[0][0] == "developer"
# No escalation.
assert not mock_err.called
# Stage stays on development — no update_task_stage in the CI-failure path.
assert not mock_update_stage.called
@patch("src.webhooks.gitea.notify_error")
@patch("src.webhooks.gitea.notify_qg_failure")
@patch("src.webhooks.gitea.enqueue_job")
@patch("src.webhooks.gitea.update_task_stage")
@patch("src.webhooks.gitea.get_db")
@patch("src.webhooks.gitea.get_task_by_repo_branch")
@patch("src.webhooks.gitea.get_project_by_repo")
def test_ci_failure_development_escalates_at_limit(
mock_proj, mock_task, mock_get_db, mock_update_stage,
mock_enqueue, mock_qg, mock_err,
):
"""retry_count >= MAX_DEV_RETRIES → escalate via notify_error, no relaunch."""
from src.webhooks.gitea import handle_ci_status, MAX_DEV_RETRIES
mock_proj.return_value = {"repo": "enduro-trails"}
mock_task.return_value = {
"id": 1, "stage": "development", "work_item_id": "ET-011",
}
mock_get_db.return_value = _mock_db_with_retry_count(MAX_DEV_RETRIES)
asyncio.run(handle_ci_status(_ci_failure_payload()))
# QG failure still reported.
assert mock_qg.called
# developer NOT re-enqueued at the cap.
assert not mock_enqueue.called
# Escalation message mentions CI failure.
assert mock_err.called
err_msg = " ".join(str(a) for a in mock_err.call_args[0])
assert "Max developer retries" in err_msg
assert "after CI failure" in err_msg
# Stage untouched.
assert not mock_update_stage.called