Compare commits
12 Commits
fix/gitea-
...
fix/observ
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
61e26a8930 | ||
| 2629dffe1b | |||
|
|
e4a9c48395 | ||
| a0621b9952 | |||
|
|
3a285de11d | ||
| 7922f6b67b | |||
|
|
e15d339b14 | ||
| 994f73a78e | |||
|
|
90c9ffe839 | ||
| b6aa107f93 | |||
|
|
0b8013cb06 | ||
| b01643fcc3 |
@@ -699,12 +699,49 @@ class AgentLauncher:
|
||||
task_id, work_item_id = row[0], row[1]
|
||||
if not work_item_id:
|
||||
return
|
||||
plane_add_comment(work_item_id, usage_comment(agent, usage), author=agent)
|
||||
# Observability: every agent's finish comment links its artifact(s)
|
||||
# (reviewer->12-review, tester->13-test-report, deployer->14-deploy-log,
|
||||
# architect->ADR, developer->PR/branch). For the developer we resolve the
|
||||
# open PR number so the link points straight at it.
|
||||
pr_number = None
|
||||
if agent == "developer":
|
||||
pr_number = self._open_pr_number(repo, branch)
|
||||
plane_add_comment(
|
||||
work_item_id,
|
||||
usage_comment(
|
||||
agent,
|
||||
usage,
|
||||
repo=repo,
|
||||
branch=branch,
|
||||
work_item_id=work_item_id,
|
||||
pr_number=pr_number,
|
||||
),
|
||||
author=agent,
|
||||
)
|
||||
if agent == "deployer":
|
||||
plane_add_comment(
|
||||
work_item_id, task_summary_comment(task_id), author="deployer"
|
||||
)
|
||||
|
||||
def _open_pr_number(self, repo: str, branch: str):
|
||||
"""Return the open PR number for `branch`, or None. Never raises."""
|
||||
try:
|
||||
import httpx
|
||||
owner = settings.gitea_owner
|
||||
headers = {"Authorization": f"token {settings.gitea_token}"}
|
||||
resp = httpx.get(
|
||||
f"{settings.gitea_url}/api/v1/repos/{owner}/{repo}/pulls",
|
||||
params={"state": "open", "head": branch},
|
||||
headers=headers, timeout=5,
|
||||
)
|
||||
if resp.status_code == 200:
|
||||
prs = resp.json()
|
||||
if prs:
|
||||
return prs[0].get("number")
|
||||
except Exception:
|
||||
pass
|
||||
return None
|
||||
|
||||
def _ensure_pr(self, repo: str, branch: str, run_id: int):
|
||||
import httpx
|
||||
owner = settings.gitea_owner
|
||||
|
||||
@@ -83,6 +83,12 @@ def init_db():
|
||||
_ensure_column(conn, "agent_runs", "input_tokens", "INTEGER")
|
||||
_ensure_column(conn, "agent_runs", "output_tokens", "INTEGER")
|
||||
_ensure_column(conn, "agent_runs", "cache_read_tokens", "INTEGER")
|
||||
# Observability fix: also persist cache-CREATION input tokens. Claude CLI
|
||||
# reports the real input split across input_tokens (fresh, ~tens) +
|
||||
# cache_read_input_tokens (cache hit, millions) + cache_creation_input_tokens
|
||||
# (writing new cache). Without this column the cache_creation slice is lost
|
||||
# and the "X in" figure understates the true prompt size. Idempotent ALTER.
|
||||
_ensure_column(conn, "agent_runs", "cache_creation_tokens", "INTEGER")
|
||||
_ensure_column(conn, "agent_runs", "cost_usd", "REAL")
|
||||
conn.commit()
|
||||
conn.close()
|
||||
|
||||
@@ -343,6 +343,17 @@ def set_issue_blocked(work_item_id: str, project_id: str = None):
|
||||
_set_issue_state_direct(work_item_id, PLANE_STATES["blocked"], project_id)
|
||||
|
||||
|
||||
def set_issue_done(work_item_id: str, project_id: str = None):
|
||||
"""Observability fix: force the issue into the TERMINAL Done state.
|
||||
|
||||
Used by the deploy->done success path so a completed task always reaches the
|
||||
terminal Plane state (it used to stick on In Progress because the merge
|
||||
webhook bypassed the stage engine). Uses the existing PLANE_STATES['done']
|
||||
UUID — the mapping itself is NOT changed.
|
||||
"""
|
||||
_set_issue_state_direct(work_item_id, PLANE_STATES["done"], project_id)
|
||||
|
||||
|
||||
def set_issue_in_progress(work_item_id: str, project_id: str = None):
|
||||
"""Set issue to 'In Progress' state — agent working."""
|
||||
_set_issue_state_direct(work_item_id, PLANE_STATES["in_progress"], project_id)
|
||||
|
||||
@@ -249,9 +249,17 @@ def check_reviewer_verdict(repo: str, work_item_id: str, branch: str | None = No
|
||||
|
||||
def check_tests_local(repo: str, branch: str) -> tuple[bool, str]:
|
||||
"""
|
||||
DEPRECATED: replaced by check_ci_green on the development stage (CI is now
|
||||
configured). Kept for backward-compat; not wired to any stage.
|
||||
|
||||
S-1 fix: run the project test suite locally and judge by exit code, instead of
|
||||
depending on Gitea CI (which is not configured -> always false).
|
||||
|
||||
БАГ 5 fix: invoke pytest directly instead of make test. make is not installed
|
||||
in the orchestrator container, so the previous ["make", "test"] call raised
|
||||
FileNotFoundError. This reproduces the Makefile test target 1:1
|
||||
(cd src/api && python -m pytest ../../tests/ -v).
|
||||
|
||||
ORCH-2 / S-4: tests run inside the per-branch worktree (ensure_worktree), so this
|
||||
is safe for concurrent active tasks — no shared /repos checkout race.
|
||||
"""
|
||||
@@ -259,7 +267,8 @@ def check_tests_local(repo: str, branch: str) -> tuple[bool, str]:
|
||||
try:
|
||||
repo_path = ensure_worktree(repo, branch)
|
||||
r = subprocess.run(
|
||||
["make", "test"], cwd=repo_path,
|
||||
["python", "-m", "pytest", "../../tests/", "-v"],
|
||||
cwd=os.path.join(repo_path, "src", "api"),
|
||||
capture_output=True, text=True, timeout=600,
|
||||
)
|
||||
if r.returncode == 0:
|
||||
@@ -272,6 +281,44 @@ def check_tests_local(repo: str, branch: str) -> tuple[bool, str]:
|
||||
return False, f"Local test run error: {e}"
|
||||
|
||||
|
||||
def check_deploy_status(repo: str, work_item_id: str, branch: str | None = None) -> tuple[bool, str]:
|
||||
"""
|
||||
БАГ 8 fix: gate the deploy -> done transition on the deployer's machine-readable
|
||||
verdict in 14-deploy-log.md frontmatter, NOT on the LLM process exit code
|
||||
(which is always 0 on a successful agent session even when the deploy failed).
|
||||
|
||||
Mirrors check_reviewer_verdict (S-5): reads ONLY `deploy_status:` from YAML
|
||||
frontmatter. Returns:
|
||||
(True, ...) -> deploy_status: SUCCESS
|
||||
(False, ...) -> deploy_status: FAILED, missing field, or no frontmatter
|
||||
"""
|
||||
import yaml
|
||||
repo_path = _repo_path(repo, branch)
|
||||
log_path = os.path.join(repo_path, f"docs/work-items/{work_item_id}/14-deploy-log.md")
|
||||
|
||||
if not os.path.isfile(log_path):
|
||||
return False, "Deploy log not found (14-deploy-log.md)"
|
||||
try:
|
||||
with open(log_path, "r") as f:
|
||||
content = f.read()
|
||||
status = None
|
||||
if content.startswith("---"):
|
||||
parts = content.split("---", 2)
|
||||
if len(parts) >= 3:
|
||||
try:
|
||||
fm = yaml.safe_load(parts[1]) or {}
|
||||
except yaml.YAMLError as e:
|
||||
return False, f"Invalid YAML frontmatter in deploy log: {e}"
|
||||
status = str(fm.get("deploy_status", "")).upper().strip()
|
||||
if status == "SUCCESS":
|
||||
return True, "Deploy status: SUCCESS"
|
||||
if status == "FAILED":
|
||||
return False, "Deploy status: FAILED"
|
||||
return False, f"No machine-readable deploy_status in frontmatter (got: {status!r})"
|
||||
except OSError as e:
|
||||
return False, f"Error reading deploy log: {e}"
|
||||
|
||||
|
||||
# Registry for dynamic lookup by name
|
||||
QG_CHECKS = {
|
||||
"check_analysis_approved": check_analysis_approved,
|
||||
@@ -282,4 +329,5 @@ QG_CHECKS = {
|
||||
"check_tests_passed": check_tests_passed,
|
||||
"check_reviewer_verdict": check_reviewer_verdict,
|
||||
"check_tests_local": check_tests_local,
|
||||
"check_deploy_status": check_deploy_status,
|
||||
}
|
||||
|
||||
@@ -47,6 +47,7 @@ from .plane_sync import (
|
||||
set_issue_needs_input,
|
||||
set_issue_in_progress,
|
||||
set_issue_blocked,
|
||||
set_issue_done,
|
||||
)
|
||||
from .config import settings
|
||||
|
||||
@@ -189,36 +190,48 @@ def advance_stage(
|
||||
|
||||
# --- Quality gate ----------------------------------------------------
|
||||
if qg_name and qg_name in QG_CHECKS:
|
||||
# Human-approval gate: special analyst approved-flow (launcher only).
|
||||
# Human-approval gate: split by path.
|
||||
if qg_name == "check_analysis_approved":
|
||||
_handle_analysis_approved_flow(
|
||||
task_id, current_stage, repo, work_item_id, branch, agent, result
|
||||
)
|
||||
return result
|
||||
# Launcher path (analyst just finished): set In Review + ask for
|
||||
# the Approved status. This gate never advances on its own -- a
|
||||
# human Approved verdict does that.
|
||||
if agent == "analyst":
|
||||
_handle_analysis_approved_flow(
|
||||
task_id, current_stage, repo, work_item_id, branch, agent, result
|
||||
)
|
||||
return result
|
||||
# Webhook Approved-verdict path (agent is None): the human flipped
|
||||
# the Plane status to Approved, which IS the approval. The gate is
|
||||
# satisfied -- do NOT re-run check_analysis_approved (it looks for
|
||||
# an :approved: *comment* and would block on a status-only
|
||||
# approval). Mark it passed and fall through to the Advance block.
|
||||
result.qg_name = qg_name
|
||||
result.qg_passed = True
|
||||
result.qg_reason = "approved-via-status"
|
||||
else:
|
||||
passed, reason = _run_qg(qg_name, repo, work_item_id, branch)
|
||||
result.qg_passed = passed
|
||||
result.qg_reason = reason
|
||||
|
||||
passed, reason = _run_qg(qg_name, repo, work_item_id, branch)
|
||||
result.qg_passed = passed
|
||||
result.qg_reason = reason
|
||||
if not passed:
|
||||
logger.info(
|
||||
f"Task {task_id}: QG '{qg_name}' not passed after {agent}: {reason}"
|
||||
)
|
||||
# Behaviour parity:
|
||||
# - webhook path (finished_agent is None): emit the generic
|
||||
# QG-failure notification, exactly like the old plane handler.
|
||||
# - launcher path (finished_agent set): NO generic notification;
|
||||
# the rollback branches below own their own messaging, exactly
|
||||
# like the old launcher handler.
|
||||
if agent is None:
|
||||
notify_qg_failure(task_id, current_stage, qg_name, reason)
|
||||
plane_notify_qg(work_item_id, current_stage, qg_name, reason)
|
||||
|
||||
if not passed:
|
||||
logger.info(
|
||||
f"Task {task_id}: QG '{qg_name}' not passed after {agent}: {reason}"
|
||||
)
|
||||
# Behaviour parity:
|
||||
# - webhook path (finished_agent is None): emit the generic
|
||||
# QG-failure notification, exactly like the old plane handler.
|
||||
# - launcher path (finished_agent set): NO generic notification;
|
||||
# the rollback branches below own their own messaging, exactly
|
||||
# like the old launcher handler.
|
||||
if agent is None:
|
||||
notify_qg_failure(task_id, current_stage, qg_name, reason)
|
||||
plane_notify_qg(work_item_id, current_stage, qg_name, reason)
|
||||
|
||||
_handle_qg_failure_rollbacks(
|
||||
task_id, current_stage, repo, work_item_id, branch,
|
||||
agent, qg_name, reason, result,
|
||||
)
|
||||
return result
|
||||
_handle_qg_failure_rollbacks(
|
||||
task_id, current_stage, repo, work_item_id, branch,
|
||||
agent, qg_name, reason, result,
|
||||
)
|
||||
return result
|
||||
|
||||
elif qg_name:
|
||||
# QG name set but not registered — do not advance (launcher behavior).
|
||||
@@ -235,6 +248,22 @@ def advance_stage(
|
||||
f"(auto-advance after {agent})"
|
||||
)
|
||||
|
||||
# --- Terminal sync: deploy -> done must reach Plane's Done -----------
|
||||
# When the deployer's check_deploy_status passes we advance to the
|
||||
# terminal 'done' stage. Previously a merged-PR webhook completed the
|
||||
# task out-of-band and Plane stuck on In Progress. Now done flows through
|
||||
# here, so explicitly drive the Plane issue into the terminal Done state
|
||||
# (PLANE_STATES['done'] — mapping unchanged) in addition to the
|
||||
# stage-change comment above.
|
||||
if next_stage == "done" and work_item_id:
|
||||
try:
|
||||
set_issue_done(work_item_id)
|
||||
logger.info(
|
||||
f"Task {task_id}: deploy->done, Plane state forced to Done"
|
||||
)
|
||||
except Exception as e:
|
||||
logger.error(f"Task {task_id}: failed to set Plane Done: {e}")
|
||||
|
||||
# --- Launch the next agent (ORCH-4 fix: current_stage, not next) -----
|
||||
next_agent = get_agent_for_stage(current_stage)
|
||||
if next_agent:
|
||||
@@ -478,3 +507,31 @@ def _handle_qg_failure_rollbacks(
|
||||
f"Task {task_id}: architect conflict, enqueued analyst "
|
||||
f"(job_id={new_job})"
|
||||
)
|
||||
|
||||
# БАГ 8: deployer verdict FAILED -> roll deploy back to development.
|
||||
# The launcher's exit_code-based guard (launcher.py:475) never fires because
|
||||
# the LLM process exit code is always 0; this gate fires on the machine-readable
|
||||
# deploy_status verdict in 14-deploy-log.md instead. Mirrors the launcher block
|
||||
# (rollback + set_issue_blocked + notify) but is driven by the VERDICT.
|
||||
if agent == "deployer" and qg_name == "check_deploy_status":
|
||||
update_task_stage(task_id, "development")
|
||||
notify_stage_change(task_id, current_stage, "development")
|
||||
plane_notify_stage(work_item_id, current_stage, "development")
|
||||
result.rolled_back_to = "development"
|
||||
set_issue_blocked(work_item_id)
|
||||
notify_qg_failure(task_id, "deploy", "check_deploy_status", reason)
|
||||
plane_add_comment(
|
||||
work_item_id,
|
||||
f"\u274c Deploy FAILED ({reason}). Rolled back to development. "
|
||||
f"Developer \u043d\u0443\u0436\u0435\u043d \u0434\u043b\u044f \u0444\u0438\u043a\u0441\u0430.",
|
||||
author="deployer",
|
||||
)
|
||||
send_telegram(
|
||||
f"\U0001f6a8 {work_item_id}: Deploy FAILED ({reason}). "
|
||||
f"Rolled back to development. Needs fix."
|
||||
)
|
||||
result.alerted = True
|
||||
logger.error(
|
||||
f"Task {task_id}: deployer verdict FAILED, rolled back deploy -> "
|
||||
f"development ({reason})"
|
||||
)
|
||||
|
||||
@@ -13,10 +13,10 @@ STAGE_TRANSITIONS = {
|
||||
"created": {"next": "analysis", "agent": "analyst", "qg": None},
|
||||
"analysis": {"next": "architecture", "agent": "architect", "qg": "check_analysis_approved"},
|
||||
"architecture": {"next": "development", "agent": "developer", "qg": "check_architecture_done"},
|
||||
"development": {"next": "review", "agent": "reviewer", "qg": "check_tests_local"},
|
||||
"development": {"next": "review", "agent": "reviewer", "qg": "check_ci_green"},
|
||||
"review": {"next": "testing", "agent": "tester", "qg": "check_reviewer_verdict"},
|
||||
"testing": {"next": "deploy", "agent": "deployer", "qg": "check_tests_passed"},
|
||||
"deploy": {"next": "done", "agent": None, "qg": None},
|
||||
"deploy": {"next": "done", "agent": None, "qg": "check_deploy_status"},
|
||||
"done": {"next": None, "agent": None, "qg": None},
|
||||
}
|
||||
|
||||
|
||||
171
src/usage.py
171
src/usage.py
@@ -31,7 +31,8 @@ def parse_usage_from_text(text: str) -> dict | None:
|
||||
top-level '{' ... '}' that parses and carries usage/total_cost_usd.
|
||||
|
||||
Returns a normalised dict
|
||||
{input_tokens, output_tokens, cache_read_tokens, cost_usd}
|
||||
{input_tokens, output_tokens, cache_read_tokens, cache_creation_tokens,
|
||||
cost_usd}
|
||||
(ints / float, missing fields -> 0 / 0.0), or None if no usable JSON found.
|
||||
"""
|
||||
if not text:
|
||||
@@ -71,6 +72,12 @@ def parse_usage_from_text(text: str) -> dict | None:
|
||||
"cache_read_tokens": _int(
|
||||
usage.get("cache_read_input_tokens", usage.get("cache_read_tokens"))
|
||||
),
|
||||
# The cache-CREATION slice (writing new cache entries) is part of the
|
||||
# REAL input and used to be dropped on the floor. Persist it so the
|
||||
# "X in" figure reflects the full prompt size, not just fresh tokens.
|
||||
"cache_creation_tokens": _int(
|
||||
usage.get("cache_creation_input_tokens", usage.get("cache_creation_tokens"))
|
||||
),
|
||||
"cost_usd": _float(cost),
|
||||
}
|
||||
|
||||
@@ -150,11 +157,12 @@ def record_usage(run_id: int, usage: dict | None):
|
||||
try:
|
||||
conn.execute(
|
||||
"UPDATE agent_runs SET input_tokens=?, output_tokens=?, "
|
||||
"cache_read_tokens=?, cost_usd=? WHERE id=?",
|
||||
"cache_read_tokens=?, cache_creation_tokens=?, cost_usd=? WHERE id=?",
|
||||
(
|
||||
usage.get("input_tokens"),
|
||||
usage.get("output_tokens"),
|
||||
usage.get("cache_read_tokens"),
|
||||
usage.get("cache_creation_tokens"),
|
||||
usage.get("cost_usd"),
|
||||
run_id,
|
||||
),
|
||||
@@ -197,19 +205,132 @@ AGENT_DISPLAY = {
|
||||
}
|
||||
|
||||
|
||||
def usage_comment(agent: str, usage: dict | None) -> str:
|
||||
def _input_total(usage: dict) -> int:
|
||||
"""FULL input = fresh input + cache-read + cache-creation tokens."""
|
||||
def _i(k):
|
||||
try:
|
||||
return int(usage.get(k) or 0)
|
||||
except (TypeError, ValueError):
|
||||
return 0
|
||||
return _i("input_tokens") + _i("cache_read_tokens") + _i("cache_creation_tokens")
|
||||
|
||||
|
||||
def _cached_total(usage: dict) -> int:
|
||||
"""Cached portion of the input = cache-read + cache-creation tokens."""
|
||||
def _i(k):
|
||||
try:
|
||||
return int(usage.get(k) or 0)
|
||||
except (TypeError, ValueError):
|
||||
return 0
|
||||
return _i("cache_read_tokens") + _i("cache_creation_tokens")
|
||||
|
||||
|
||||
def fmt_in(usage: dict) -> str:
|
||||
"""Render the input figure as full total with a cached breakdown.
|
||||
|
||||
'8.5M in (8.4M cached)' when there is a cache; '45.2k in' when cached==0.
|
||||
"""
|
||||
total = _input_total(usage)
|
||||
cached = _cached_total(usage)
|
||||
if cached > 0:
|
||||
return f"{fmt_tokens(total)} in ({fmt_tokens(cached)} cached)"
|
||||
return f"{fmt_tokens(total)} in"
|
||||
|
||||
|
||||
def usage_comment(
|
||||
agent: str,
|
||||
usage: dict | None,
|
||||
repo: str | None = None,
|
||||
branch: str | None = None,
|
||||
work_item_id: str | None = None,
|
||||
pr_number=None,
|
||||
) -> str:
|
||||
"""Build the per-agent finish comment, e.g.
|
||||
'\U0001f4bb Developer \u0433\u043e\u0442\u043e\u0432 \u00b7 45.2k in / 12.1k out \u00b7 $0.21'.
|
||||
'\U0001f4bb Developer \u0433\u043e\u0442\u043e\u0432 \u00b7 8.5M in (8.4M cached) / 45.8k out \u00b7 $7.29'.
|
||||
|
||||
When repo/branch/work_item_id are supplied, the agent's artifact link(s) are
|
||||
appended (BUG: only analyst used to link its docs). Missing artifacts are
|
||||
silently skipped — link building never raises.
|
||||
"""
|
||||
usage = usage or {}
|
||||
name = AGENT_DISPLAY.get(agent, agent.capitalize())
|
||||
icon = AGENT_ICON.get(agent, "\u2705")
|
||||
return (
|
||||
line = (
|
||||
f"{icon} {name} \u0433\u043e\u0442\u043e\u0432 \u00b7 "
|
||||
f"{fmt_tokens(usage.get('input_tokens'))} in / "
|
||||
f"{fmt_in(usage)} / "
|
||||
f"{fmt_tokens(usage.get('output_tokens'))} out \u00b7 "
|
||||
f"{fmt_cost(usage.get('cost_usd'))}"
|
||||
)
|
||||
links = artifact_links(agent, repo, branch, work_item_id, pr_number)
|
||||
if links:
|
||||
line += "\n" + "\n".join(links)
|
||||
return line
|
||||
|
||||
|
||||
# Per-agent artifact file under docs/work-items/{wid}/ (architect/developer use
|
||||
# special handling for ADR dirs / PR links, see artifact_links()).
|
||||
AGENT_ARTIFACT = {
|
||||
"reviewer": ("Review", "12-review.md"),
|
||||
"tester": ("Test report", "13-test-report.md"),
|
||||
"deployer": ("Deploy log", "14-deploy-log.md"),
|
||||
}
|
||||
|
||||
|
||||
def artifact_links(
|
||||
agent: str,
|
||||
repo: str | None,
|
||||
branch: str | None,
|
||||
work_item_id: str | None,
|
||||
pr_number=None,
|
||||
) -> list[str]:
|
||||
"""Markdown link(s) to the finishing agent's artifact(s) in Gitea.
|
||||
|
||||
Uses gitea_public_url (falls back to gitea_url) for clickable links, mirroring
|
||||
the analyst doc links. Returns [] (never raises) when there is nothing to
|
||||
link or the required context is missing. analyst is intentionally NOT handled
|
||||
here — its richer doc list lives in stage_engine._build_analyst_ready_comment.
|
||||
"""
|
||||
try:
|
||||
from .config import settings
|
||||
owner = getattr(settings, "gitea_owner", "admin")
|
||||
base = (
|
||||
getattr(settings, "gitea_public_url", "") or getattr(settings, "gitea_url", "")
|
||||
).rstrip("/")
|
||||
if not base or not repo:
|
||||
return []
|
||||
links: list[str] = []
|
||||
|
||||
if agent == "developer":
|
||||
if branch:
|
||||
links.append(
|
||||
f"\U0001f4c2 [Branch {branch}]({base}/{owner}/{repo}/src/branch/{branch})"
|
||||
)
|
||||
if pr_number:
|
||||
links.append(
|
||||
f"\U0001f517 [PR #{pr_number}]({base}/{owner}/{repo}/pulls/{pr_number})"
|
||||
)
|
||||
return links
|
||||
|
||||
if agent == "architect":
|
||||
if branch and work_item_id:
|
||||
adr_dir = (
|
||||
f"{base}/{owner}/{repo}/src/branch/{branch}/"
|
||||
f"docs/work-items/{work_item_id}/06-adr"
|
||||
)
|
||||
links.append(f"\U0001f4d0 [ADR]({adr_dir})")
|
||||
return links
|
||||
|
||||
spec = AGENT_ARTIFACT.get(agent)
|
||||
if spec and branch and work_item_id:
|
||||
label, fname = spec
|
||||
href = (
|
||||
f"{base}/{owner}/{repo}/src/branch/{branch}/"
|
||||
f"docs/work-items/{work_item_id}/{fname}"
|
||||
)
|
||||
links.append(f"\U0001f4c4 [{label}]({href})")
|
||||
return links
|
||||
except Exception:
|
||||
return []
|
||||
|
||||
|
||||
AGENT_ICON = {
|
||||
@@ -225,13 +346,22 @@ AGENT_ICON = {
|
||||
def task_usage_summary(task_id: int) -> dict:
|
||||
"""Aggregate agent_runs usage for a task.
|
||||
|
||||
Returns {total_in, total_out, total_cost, per_agent: [(agent, in, out, cost), ...]}.
|
||||
total_in counts the FULL input (input + cache_read + cache_creation), and
|
||||
total_cached counts the cached portion (cache_read + cache_creation).
|
||||
COALESCE(...,0) keeps pre-existing rows (NULL cache_creation) from breaking.
|
||||
|
||||
Returns {total_in, total_cached, total_out, total_cost,
|
||||
per_agent: [(agent, in, cached, out, cost), ...]}.
|
||||
"""
|
||||
conn = get_db()
|
||||
try:
|
||||
rows = conn.execute(
|
||||
"SELECT agent, "
|
||||
"COALESCE(SUM(input_tokens),0), "
|
||||
"COALESCE(SUM(input_tokens),0) "
|
||||
" + COALESCE(SUM(cache_read_tokens),0) "
|
||||
" + COALESCE(SUM(cache_creation_tokens),0), "
|
||||
"COALESCE(SUM(cache_read_tokens),0) "
|
||||
" + COALESCE(SUM(cache_creation_tokens),0), "
|
||||
"COALESCE(SUM(output_tokens),0), "
|
||||
"COALESCE(SUM(cost_usd),0.0) "
|
||||
"FROM agent_runs WHERE task_id=? GROUP BY agent ORDER BY agent",
|
||||
@@ -239,12 +369,14 @@ def task_usage_summary(task_id: int) -> dict:
|
||||
).fetchall()
|
||||
finally:
|
||||
conn.close()
|
||||
per_agent = [(r[0], int(r[1]), int(r[2]), float(r[3])) for r in rows]
|
||||
per_agent = [(r[0], int(r[1]), int(r[2]), int(r[3]), float(r[4])) for r in rows]
|
||||
total_in = sum(r[1] for r in per_agent)
|
||||
total_out = sum(r[2] for r in per_agent)
|
||||
total_cost = sum(r[3] for r in per_agent)
|
||||
total_cached = sum(r[2] for r in per_agent)
|
||||
total_out = sum(r[3] for r in per_agent)
|
||||
total_cost = sum(r[4] for r in per_agent)
|
||||
return {
|
||||
"total_in": total_in,
|
||||
"total_cached": total_cached,
|
||||
"total_out": total_out,
|
||||
"total_cost": total_cost,
|
||||
"per_agent": per_agent,
|
||||
@@ -254,15 +386,26 @@ def task_usage_summary(task_id: int) -> dict:
|
||||
def task_summary_comment(task_id: int) -> str:
|
||||
"""Build the Deployer end-of-task summary comment (Feature 4, variant B)."""
|
||||
s = task_usage_summary(task_id)
|
||||
cached = s.get("total_cached", 0)
|
||||
head_in = (
|
||||
f"{fmt_tokens(s['total_in'])} \u0432\u0445\u043e\u0434 ({fmt_tokens(cached)} cached)"
|
||||
if cached > 0
|
||||
else f"{fmt_tokens(s['total_in'])} \u0432\u0445\u043e\u0434"
|
||||
)
|
||||
lines = [
|
||||
f"\U0001f4ca \u0418\u0442\u043e\u0433\u043e \u043f\u043e \u0437\u0430\u0434\u0430\u0447\u0435: "
|
||||
f"{fmt_tokens(s['total_in'])} \u0442\u043e\u043a\u0435\u043d\u043e\u0432 \u0432\u0445\u043e\u0434 / "
|
||||
f"{head_in} / "
|
||||
f"{fmt_tokens(s['total_out'])} \u0432\u044b\u0445\u043e\u0434 \u00b7 "
|
||||
f"{fmt_cost(s['total_cost'])}"
|
||||
]
|
||||
for agent, ti, to, cost in s["per_agent"]:
|
||||
for agent, ti, tc, to, cost in s["per_agent"]:
|
||||
name = AGENT_DISPLAY.get(agent, agent.capitalize())
|
||||
in_str = (
|
||||
f"{fmt_tokens(ti)} in ({fmt_tokens(tc)} cached)"
|
||||
if tc > 0
|
||||
else f"{fmt_tokens(ti)} in"
|
||||
)
|
||||
lines.append(
|
||||
f"\u2022 {name}: {fmt_tokens(ti)} in / {fmt_tokens(to)} out \u00b7 {fmt_cost(cost)}"
|
||||
f"\u2022 {name}: {in_str} / {fmt_tokens(to)} out \u00b7 {fmt_cost(cost)}"
|
||||
)
|
||||
return "\n".join(lines)
|
||||
|
||||
@@ -216,12 +216,31 @@ async def handle_ci_status(payload: dict):
|
||||
else:
|
||||
notify_qg_failure(task_id, current_stage, "check_ci_green", reason)
|
||||
|
||||
elif state == "failure":
|
||||
# S-1: Gitea CI is NOT the authoritative gate anymore (the orchestrator runs
|
||||
# tests locally via check_tests_local). Gitea CI is often unconfigured, so a
|
||||
# "failure"/empty status here is not actionable. Log only, do not alert.
|
||||
logger.debug(f"Task {task_id}: Gitea CI state='failure' on branch '{branch}' "
|
||||
f"(non-authoritative, suppressed — local tests are the gate)")
|
||||
elif state == "failure" and current_stage == "development":
|
||||
# CI is the authoritative gate for development -> review.
|
||||
# On red CI: notify, then bounce the task back to the developer (capped retries),
|
||||
# symmetric to the review REQUEST_CHANGES path.
|
||||
notify_qg_failure(task_id, current_stage, "check_ci_green", f"Gitea CI failed on branch '{branch}'")
|
||||
conn = get_db()
|
||||
retry_count = conn.execute(
|
||||
"SELECT COUNT(*) as cnt FROM agent_runs WHERE task_id = ? AND agent = 'developer'",
|
||||
(task_id,),
|
||||
).fetchone()["cnt"]
|
||||
conn.close()
|
||||
if retry_count < MAX_DEV_RETRIES:
|
||||
# task already on 'development' — no stage change needed, just relaunch developer
|
||||
try:
|
||||
task_desc = (
|
||||
f"Work item: {work_item_id}\nRepo: {repo_name}\nBranch: {branch}\n"
|
||||
f"Stage: development\nNote: CI failed, fix and re-push (attempt {retry_count + 1}/{MAX_DEV_RETRIES})"
|
||||
)
|
||||
job_id = enqueue_job("developer", repo_name, task_desc, task_id=task_id)
|
||||
logger.info(f"Task {task_id}: CI failed, enqueued developer (attempt {retry_count + 1}, job_id={job_id})")
|
||||
except Exception as e:
|
||||
notify_error(task_id, f"Failed to relaunch developer after CI failure: {e}")
|
||||
else:
|
||||
notify_error(task_id, f"Max developer retries ({MAX_DEV_RETRIES}) reached after CI failure, escalating")
|
||||
logger.error(f"Task {task_id}: max retries reached after CI failure, needs manual intervention")
|
||||
|
||||
|
||||
async def handle_pr(payload: dict):
|
||||
@@ -315,6 +334,20 @@ async def handle_pr(payload: dict):
|
||||
logger.error(f"Task {task_id}: max retries reached, needs manual intervention")
|
||||
|
||||
elif action == "closed" and pr.get("merged", False):
|
||||
# BUG 8 (second door): at the deploy stage `done` is gated by the
|
||||
# deployer's verdict (check_deploy_status via advance_stage), NOT by the
|
||||
# fact that the PR was merged. The deployer merges the PR at the START of
|
||||
# its run, so a merged webhook arrives ~30s later while the deployer is
|
||||
# still working — blindly setting done here would fake-complete the task
|
||||
# and discard a later deploy_status: FAILED verdict. advance_stage will
|
||||
# drive deploy→done (and Plane→Done) when the deployer job finishes.
|
||||
# For every OTHER stage the merge-driven done behaviour is preserved.
|
||||
if current_stage == "deploy":
|
||||
logger.info(
|
||||
f"Task {task_id}: PR merged at deploy stage — done gated by "
|
||||
f"deployer verdict (check_deploy_status), ignoring merge-driven done."
|
||||
)
|
||||
return
|
||||
update_task_stage(task_id, "done")
|
||||
notify_stage_change(task_id, current_stage, "done")
|
||||
logger.info(f"Task {task_id}: PR merged, stage → done")
|
||||
|
||||
116
tests/test_qg.py
116
tests/test_qg.py
@@ -17,7 +17,10 @@ from src.qg.checks import (
|
||||
check_ci_green,
|
||||
check_review_approved,
|
||||
check_tests_passed,
|
||||
check_tests_local,
|
||||
check_deploy_status,
|
||||
)
|
||||
from src.stages import get_qg_for_stage
|
||||
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
@@ -186,3 +189,116 @@ class TestCheckTestsPassed:
|
||||
passed, reason = check_tests_passed("enduro-trails", "ET-001")
|
||||
assert passed is False
|
||||
assert "not found" in reason.lower()
|
||||
|
||||
|
||||
class TestCheckDeployStatus:
|
||||
"""BUG 8: deploy -> done must be gated on the deployer's machine-readable
|
||||
deploy_status verdict in 14-deploy-log.md frontmatter, NOT the LLM exit code
|
||||
(always 0). Mirrors check_reviewer_verdict (reads ONLY the frontmatter field)."""
|
||||
|
||||
def _write_log(self, repo_dir, content):
|
||||
wi_dir = repo_dir / "docs" / "work-items" / "ET-011"
|
||||
wi_dir.mkdir(parents=True)
|
||||
(wi_dir / "14-deploy-log.md").write_text(content)
|
||||
|
||||
def test_success_verdict_passes(self, setup_work_item_dir):
|
||||
self._write_log(
|
||||
setup_work_item_dir,
|
||||
"---\ndeploy_status: SUCCESS\nversion: v0.0.3\n---\n\nDeployed OK.\n",
|
||||
)
|
||||
passed, reason = check_deploy_status("enduro-trails", "ET-011")
|
||||
assert passed is True
|
||||
assert "SUCCESS" in reason
|
||||
|
||||
def test_failed_verdict_fails(self, setup_work_item_dir):
|
||||
self._write_log(
|
||||
setup_work_item_dir,
|
||||
"---\ndeploy_status: FAILED\nversion: v0.0.3\n---\n\npermission denied.\n",
|
||||
)
|
||||
passed, reason = check_deploy_status("enduro-trails", "ET-011")
|
||||
assert passed is False
|
||||
assert "FAILED" in reason
|
||||
|
||||
def test_no_file_fails(self, setup_work_item_dir):
|
||||
passed, reason = check_deploy_status("enduro-trails", "ET-011")
|
||||
assert passed is False
|
||||
assert "not found" in reason.lower()
|
||||
|
||||
def test_no_field_fails(self, setup_work_item_dir):
|
||||
# Frontmatter present but no deploy_status field -> must NOT pass.
|
||||
self._write_log(
|
||||
setup_work_item_dir,
|
||||
"---\nversion: v0.0.3\n---\n\nStatus: FAILED (prose only).\n",
|
||||
)
|
||||
passed, reason = check_deploy_status("enduro-trails", "ET-011")
|
||||
assert passed is False
|
||||
|
||||
def test_prose_only_no_frontmatter_fails(self, setup_work_item_dir):
|
||||
# Prose mentioning SUCCESS but no machine-readable frontmatter -> fail.
|
||||
self._write_log(
|
||||
setup_work_item_dir,
|
||||
"# Deploy log\n\nStatus: SUCCESS (prose, not frontmatter).\n",
|
||||
)
|
||||
passed, reason = check_deploy_status("enduro-trails", "ET-011")
|
||||
assert passed is False
|
||||
|
||||
def test_deploy_stage_qg_is_check_deploy_status(self):
|
||||
assert get_qg_for_stage("deploy") == "check_deploy_status"
|
||||
|
||||
def test_registered_in_qg_checks(self):
|
||||
from src.qg.checks import QG_CHECKS
|
||||
assert QG_CHECKS.get("check_deploy_status") is check_deploy_status
|
||||
|
||||
|
||||
class TestDevelopmentStageQG:
|
||||
"""BUG 6: development stage QG is now check_ci_green (CI is the authoritative
|
||||
gate), not the deprecated check_tests_local."""
|
||||
|
||||
def test_development_qg_is_check_ci_green(self):
|
||||
assert get_qg_for_stage("development") == "check_ci_green"
|
||||
|
||||
def test_check_tests_local_is_deprecated_and_unwired(self):
|
||||
# Kept in the registry for backward-compat, but not wired to any stage.
|
||||
from src.qg.checks import QG_CHECKS
|
||||
from src.stages import STAGE_TRANSITIONS
|
||||
assert "check_tests_local" in QG_CHECKS
|
||||
wired = {t.get("qg") for t in STAGE_TRANSITIONS.values()}
|
||||
assert "check_tests_local" not in wired
|
||||
|
||||
|
||||
class TestCheckTestsLocal:
|
||||
"""BUG 5: check_tests_local must run pytest directly (not make, which is
|
||||
not installed in the orchestrator container)."""
|
||||
|
||||
@patch("src.qg.checks.ensure_worktree")
|
||||
@patch("subprocess.run")
|
||||
def test_passes_on_returncode_zero(self, mock_run, mock_wt, tmp_path):
|
||||
mock_wt.return_value = str(tmp_path)
|
||||
mock_run.return_value = MagicMock(returncode=0, stdout="ok", stderr="")
|
||||
passed, reason = check_tests_local("enduro-trails", "feature/ET-001-x")
|
||||
assert passed is True
|
||||
assert reason == "Local tests passed"
|
||||
|
||||
@patch("src.qg.checks.ensure_worktree")
|
||||
@patch("subprocess.run")
|
||||
def test_fails_on_nonzero_returncode(self, mock_run, mock_wt, tmp_path):
|
||||
mock_wt.return_value = str(tmp_path)
|
||||
mock_run.return_value = MagicMock(returncode=1, stdout="boom", stderr="trace")
|
||||
passed, reason = check_tests_local("enduro-trails", "feature/ET-001-x")
|
||||
assert passed is False
|
||||
assert "Local tests failed" in reason
|
||||
|
||||
@patch("src.qg.checks.ensure_worktree")
|
||||
@patch("subprocess.run")
|
||||
def test_invokes_pytest_not_make(self, mock_run, mock_wt, tmp_path):
|
||||
"""The subprocess call must be pytest, from src/api, against ../../tests/."""
|
||||
mock_wt.return_value = str(tmp_path)
|
||||
mock_run.return_value = MagicMock(returncode=0, stdout="", stderr="")
|
||||
check_tests_local("enduro-trails", "feature/ET-001-x")
|
||||
args, kwargs = mock_run.call_args
|
||||
cmd = args[0]
|
||||
assert "make" not in cmd
|
||||
assert cmd[:3] == ["python", "-m", "pytest"]
|
||||
assert "../../tests/" in cmd
|
||||
assert kwargs["cwd"] == os.path.join(str(tmp_path), "src", "api")
|
||||
|
||||
|
||||
@@ -69,6 +69,7 @@ def silence_side_effects(monkeypatch):
|
||||
"set_issue_needs_input",
|
||||
"set_issue_in_progress",
|
||||
"set_issue_blocked",
|
||||
"set_issue_done",
|
||||
):
|
||||
monkeypatch.setattr(stage_engine, name, MagicMock())
|
||||
|
||||
@@ -177,6 +178,40 @@ class TestHappyPathAgentSelection:
|
||||
assert res.enqueued_agent is None
|
||||
assert _jobs() == []
|
||||
|
||||
def test_deploy_success_syncs_plane_to_terminal_done(self, monkeypatch):
|
||||
"""FIX 3: a successful deploy->done forces the Plane issue to terminal Done.
|
||||
|
||||
Previously the task could stick on In Progress because the merge webhook
|
||||
completed it out-of-band. Now the engine drives set_issue_done() on the
|
||||
deploy->done success transition.
|
||||
"""
|
||||
monkeypatch.setattr(
|
||||
stage_engine, "QG_CHECKS",
|
||||
{k: _pass for k in stage_engine.QG_CHECKS},
|
||||
)
|
||||
task_id = _make_task("deploy", wi="ET-012")
|
||||
res = advance_stage(
|
||||
task_id, "deploy", "enduro-trails", "ET-012",
|
||||
"feature/ET-012-x", finished_agent="deployer",
|
||||
)
|
||||
assert res.advanced is True
|
||||
assert _stage(task_id) == "done"
|
||||
# The terminal Plane sync was invoked with the work item id.
|
||||
stage_engine.set_issue_done.assert_called_once_with("ET-012")
|
||||
|
||||
def test_non_terminal_advance_does_not_force_plane_done(self, monkeypatch):
|
||||
"""set_issue_done must only fire on the terminal deploy->done transition."""
|
||||
monkeypatch.setattr(
|
||||
stage_engine, "QG_CHECKS",
|
||||
{k: _pass for k in stage_engine.QG_CHECKS},
|
||||
)
|
||||
task_id = _make_task("review")
|
||||
advance_stage(
|
||||
task_id, "review", "enduro-trails", "ET-001",
|
||||
"feature/ET-001-x", finished_agent=None,
|
||||
)
|
||||
stage_engine.set_issue_done.assert_not_called()
|
||||
|
||||
def test_done_is_terminal(self):
|
||||
task_id = _make_task("done")
|
||||
res = advance_stage(task_id, "done", "enduro-trails", "ET-001",
|
||||
@@ -203,10 +238,13 @@ class TestQgFailureDoesNotAdvance:
|
||||
assert _jobs() == []
|
||||
|
||||
def test_webhook_path_emits_qg_failure_notification(self, monkeypatch):
|
||||
"""finished_agent=None -> generic QG-failure notification fires (plane parity)."""
|
||||
"""finished_agent=None -> generic QG-failure notification fires (plane parity).
|
||||
|
||||
development stage QG is now check_ci_green (was check_tests_local).
|
||||
"""
|
||||
monkeypatch.setattr(
|
||||
stage_engine, "QG_CHECKS",
|
||||
{**stage_engine.QG_CHECKS, "check_tests_local": _fail("ci red")},
|
||||
{**stage_engine.QG_CHECKS, "check_ci_green": _fail("ci red")},
|
||||
)
|
||||
task_id = _make_task("development")
|
||||
advance_stage(task_id, "development", "enduro-trails", "ET-001",
|
||||
@@ -297,6 +335,59 @@ class TestTesterFail:
|
||||
assert _jobs() == []
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# BUG 8: deploy verdict gates deploy -> done (not the LLM exit code)
|
||||
# ---------------------------------------------------------------------------
|
||||
class TestDeployVerdict:
|
||||
"""deploy -> done must be gated on check_deploy_status (the deployer's
|
||||
machine-readable verdict), NOT on the LLM exit code (always 0)."""
|
||||
|
||||
def test_failed_verdict_rolls_back_to_development(self, monkeypatch):
|
||||
# deployer finished (exit_code 0 from launcher), but verdict is FAILED.
|
||||
monkeypatch.setattr(
|
||||
stage_engine, "QG_CHECKS",
|
||||
{**stage_engine.QG_CHECKS,
|
||||
"check_deploy_status": _fail("Deploy status: FAILED")},
|
||||
)
|
||||
task_id = _make_task("deploy")
|
||||
res = advance_stage(task_id, "deploy", "enduro-trails", "ET-011",
|
||||
"feature/ET-011-x", finished_agent="deployer")
|
||||
assert res.advanced is False
|
||||
assert res.rolled_back_to == "development"
|
||||
assert _stage(task_id) == "development" # NOT done
|
||||
assert res.alerted is True
|
||||
assert stage_engine.set_issue_blocked.called
|
||||
assert stage_engine.send_telegram.called
|
||||
|
||||
def test_no_deploy_log_rolls_back(self, monkeypatch):
|
||||
# No frontmatter field / no file -> check returns False -> rollback.
|
||||
monkeypatch.setattr(
|
||||
stage_engine, "QG_CHECKS",
|
||||
{**stage_engine.QG_CHECKS,
|
||||
"check_deploy_status": _fail("Deploy log not found (14-deploy-log.md)")},
|
||||
)
|
||||
task_id = _make_task("deploy")
|
||||
res = advance_stage(task_id, "deploy", "enduro-trails", "ET-011",
|
||||
"feature/ET-011-x", finished_agent="deployer")
|
||||
assert res.advanced is False
|
||||
assert _stage(task_id) == "development"
|
||||
|
||||
def test_success_verdict_advances_to_done(self, monkeypatch):
|
||||
monkeypatch.setattr(
|
||||
stage_engine, "QG_CHECKS",
|
||||
{**stage_engine.QG_CHECKS,
|
||||
"check_deploy_status": _pass},
|
||||
)
|
||||
task_id = _make_task("deploy")
|
||||
res = advance_stage(task_id, "deploy", "enduro-trails", "ET-011",
|
||||
"feature/ET-011-x", finished_agent="deployer")
|
||||
assert res.advanced is True
|
||||
assert res.to_stage == "done"
|
||||
assert _stage(task_id) == "done"
|
||||
assert res.enqueued_agent is None # no agent leaves deploy
|
||||
assert _jobs() == []
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Architect conflict -> rollback to analysis + enqueue analyst
|
||||
# ---------------------------------------------------------------------------
|
||||
@@ -358,6 +449,63 @@ class TestAnalysisApprovedFlow:
|
||||
assert stage_engine.notify_approve_requested.called
|
||||
assert _jobs() == []
|
||||
|
||||
def test_approved_verdict_advances_analysis_to_architecture(self, monkeypatch):
|
||||
"""BUG 4: a human Approved STATUS (webhook path, finished_agent=None)
|
||||
must satisfy the analysis gate and advance analysis -> architecture,
|
||||
enqueuing the architect. The status-only approval must NOT re-run
|
||||
check_analysis_approved (which looks for an :approved: COMMENT and would
|
||||
otherwise wrongly block the advance).
|
||||
"""
|
||||
# Make check_analysis_approved FAIL if it is ever called: the webhook
|
||||
# path must bypass it entirely (status == approval). If the engine were
|
||||
# to re-run the gate, this would block the advance and fail the test.
|
||||
monkeypatch.setattr(
|
||||
stage_engine, "QG_CHECKS",
|
||||
{
|
||||
**stage_engine.QG_CHECKS,
|
||||
"check_analysis_approved": _fail("no :approved: comment"),
|
||||
},
|
||||
)
|
||||
# Guard: the approval-flow (launcher-only) must NOT be invoked here.
|
||||
flow = MagicMock()
|
||||
monkeypatch.setattr(stage_engine, "_handle_analysis_approved_flow", flow)
|
||||
|
||||
task_id = _make_task("analysis")
|
||||
res = advance_stage(
|
||||
task_id, "analysis", "enduro-trails", "ET-001",
|
||||
"feature/ET-001-x", finished_agent=None,
|
||||
)
|
||||
|
||||
assert res.advanced is True
|
||||
assert res.to_stage == "architecture"
|
||||
assert _stage(task_id) == "architecture"
|
||||
assert res.enqueued_agent == "architect"
|
||||
# Sanity: agent for analysis is architect, never analyst (no re-run loop).
|
||||
assert get_agent_for_stage("analysis") == "architect"
|
||||
jobs = _jobs()
|
||||
assert len(jobs) == 1
|
||||
assert jobs[0]["agent"] == "architect"
|
||||
# The launcher-only approval-flow was NOT called on the webhook path.
|
||||
flow.assert_not_called()
|
||||
|
||||
def test_launcher_path_does_not_advance_and_calls_flow(self, monkeypatch):
|
||||
"""Regression: the launcher path (finished_agent='analyst') still routes
|
||||
into _handle_analysis_approved_flow and does NOT advance.
|
||||
"""
|
||||
flow = MagicMock()
|
||||
monkeypatch.setattr(stage_engine, "_handle_analysis_approved_flow", flow)
|
||||
|
||||
task_id = _make_task("analysis")
|
||||
res = advance_stage(
|
||||
task_id, "analysis", "enduro-trails", "ET-001",
|
||||
"feature/ET-001-x", finished_agent="analyst",
|
||||
)
|
||||
|
||||
assert res.advanced is not True
|
||||
assert _stage(task_id) == "analysis"
|
||||
assert _jobs() == []
|
||||
flow.assert_called_once()
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# launcher + plane both delegate to the engine
|
||||
|
||||
@@ -62,9 +62,27 @@ def test_parse_real_result_json():
|
||||
assert u["input_tokens"] == 45231
|
||||
assert u["output_tokens"] == 12100
|
||||
assert u["cache_read_tokens"] == 18500
|
||||
# FIX 2: cache_creation slice must now be parsed (was dropped before).
|
||||
assert u["cache_creation_tokens"] == 7418
|
||||
assert abs(u["cost_usd"] - 0.0560175) < 1e-9
|
||||
|
||||
|
||||
def test_parse_cache_creation_present():
|
||||
u = U.parse_usage_from_text(REAL_RESULT_JSON)
|
||||
assert u["cache_creation_tokens"] == 7418
|
||||
|
||||
|
||||
def test_parse_cache_creation_missing_defaults_zero():
|
||||
blob = (
|
||||
'{"total_cost_usd":0.01,'
|
||||
'"usage":{"input_tokens":10,"output_tokens":5,'
|
||||
'"cache_read_input_tokens":100}}'
|
||||
)
|
||||
u = U.parse_usage_from_text(blob)
|
||||
assert u["cache_creation_tokens"] == 0
|
||||
assert u["cache_read_tokens"] == 100
|
||||
|
||||
|
||||
def test_parse_with_leading_text():
|
||||
"""The agent may print text before the trailing JSON; we still find it."""
|
||||
text = "some agent stdout line\nanother line\n" + REAL_RESULT_JSON
|
||||
@@ -106,13 +124,16 @@ def test_record_usage_writes_columns():
|
||||
U.record_usage(rid, u)
|
||||
conn = get_db()
|
||||
row = conn.execute(
|
||||
"SELECT input_tokens, output_tokens, cache_read_tokens, cost_usd "
|
||||
"SELECT input_tokens, output_tokens, cache_read_tokens, "
|
||||
"cache_creation_tokens, cost_usd "
|
||||
"FROM agent_runs WHERE id=?", (rid,)
|
||||
).fetchone()
|
||||
conn.close()
|
||||
assert row["input_tokens"] == 45231
|
||||
assert row["output_tokens"] == 12100
|
||||
assert row["cache_read_tokens"] == 18500
|
||||
# FIX 2: cache_creation column is now persisted.
|
||||
assert row["cache_creation_tokens"] == 7418
|
||||
assert abs(row["cost_usd"] - 0.0560175) < 1e-9
|
||||
|
||||
|
||||
@@ -144,14 +165,82 @@ def test_fmt_cost():
|
||||
|
||||
|
||||
def test_usage_comment_format():
|
||||
# No cache -> in_total == input_tokens, no cached breakdown shown.
|
||||
u = {"input_tokens": 45231, "output_tokens": 12100, "cost_usd": 0.21}
|
||||
c = U.usage_comment("developer", u)
|
||||
assert "Developer" in c
|
||||
assert "45.2k in" in c
|
||||
assert "cached" not in c
|
||||
assert "12.1k out" in c
|
||||
assert "$0.21" in c
|
||||
|
||||
|
||||
def test_usage_comment_shows_full_input_with_cached():
|
||||
"""FIX 2: in = input + cache_read + cache_creation, with cached breakdown."""
|
||||
u = {
|
||||
"input_tokens": 81,
|
||||
"cache_read_tokens": 8_400_000,
|
||||
"cache_creation_tokens": 100_000,
|
||||
"output_tokens": 45_800,
|
||||
"cost_usd": 7.29,
|
||||
}
|
||||
c = U.usage_comment("developer", u)
|
||||
# total in = 8_500_081 -> 8.5M ; cached = 8_500_000 -> 8.5M
|
||||
assert "8.5M in (8.5M cached)" in c
|
||||
assert "45.8k out" in c
|
||||
assert "$7.29" in c
|
||||
|
||||
|
||||
def test_usage_comment_no_cached_when_zero():
|
||||
u = {"input_tokens": 1234, "cache_read_tokens": 0,
|
||||
"cache_creation_tokens": 0, "output_tokens": 50, "cost_usd": 0.01}
|
||||
c = U.usage_comment("developer", u)
|
||||
assert "1.2k in" in c
|
||||
assert "cached" not in c
|
||||
|
||||
|
||||
# --------------------------------------------------------------------------- #
|
||||
# FIX 4: per-agent artifact links in finish comments
|
||||
# --------------------------------------------------------------------------- #
|
||||
def _ctx():
|
||||
return dict(repo="enduro-trails", branch="feature/ET-012-x",
|
||||
work_item_id="ET-012")
|
||||
|
||||
|
||||
def test_usage_comment_reviewer_links_review_doc():
|
||||
c = U.usage_comment("reviewer", {"input_tokens": 5}, **_ctx())
|
||||
assert "12-review.md" in c
|
||||
assert "ET-012" in c
|
||||
|
||||
|
||||
def test_usage_comment_tester_links_test_report():
|
||||
c = U.usage_comment("tester", {"input_tokens": 5}, **_ctx())
|
||||
assert "13-test-report.md" in c
|
||||
|
||||
|
||||
def test_usage_comment_deployer_links_deploy_log():
|
||||
c = U.usage_comment("deployer", {"input_tokens": 5}, **_ctx())
|
||||
assert "14-deploy-log.md" in c
|
||||
|
||||
|
||||
def test_usage_comment_developer_links_pr_and_branch():
|
||||
c = U.usage_comment("developer", {"input_tokens": 5}, pr_number=7, **_ctx())
|
||||
assert "pulls/7" in c
|
||||
assert "feature/ET-012-x" in c
|
||||
|
||||
|
||||
def test_usage_comment_architect_links_adr():
|
||||
c = U.usage_comment("architect", {"input_tokens": 5}, **_ctx())
|
||||
assert "06-adr" in c
|
||||
|
||||
|
||||
def test_usage_comment_no_links_without_context():
|
||||
"""Without repo/branch context, no links are appended (no crash)."""
|
||||
c = U.usage_comment("reviewer", {"input_tokens": 5})
|
||||
assert "12-review.md" not in c
|
||||
assert "http" not in c
|
||||
|
||||
|
||||
# --------------------------------------------------------------------------- #
|
||||
# task summary
|
||||
# --------------------------------------------------------------------------- #
|
||||
@@ -174,3 +263,47 @@ def test_task_summary_aggregates_over_agents():
|
||||
assert "$0.15" in comment # total cost
|
||||
assert "Developer" in comment
|
||||
assert "Tester" in comment
|
||||
|
||||
|
||||
def test_task_summary_sums_all_three_input_components():
|
||||
"""FIX 2: total_in = SUM(input + cache_read + cache_creation); total_cached too."""
|
||||
rid = _new_run(agent="developer", task_id=77)
|
||||
U.record_usage(rid, {
|
||||
"input_tokens": 100,
|
||||
"cache_read_tokens": 2000,
|
||||
"cache_creation_tokens": 900,
|
||||
"output_tokens": 50,
|
||||
"cost_usd": 0.10,
|
||||
})
|
||||
rid2 = _new_run(agent="tester", task_id=77)
|
||||
U.record_usage(rid2, {
|
||||
"input_tokens": 10,
|
||||
"cache_read_tokens": 500,
|
||||
"cache_creation_tokens": 0,
|
||||
"output_tokens": 5,
|
||||
"cost_usd": 0.05,
|
||||
})
|
||||
s = U.task_usage_summary(77)
|
||||
# total_in = (100+2000+900) + (10+500+0) = 3510
|
||||
assert s["total_in"] == 3510
|
||||
# total_cached = (2000+900) + (500+0) = 3400
|
||||
assert s["total_cached"] == 3400
|
||||
assert s["total_out"] == 55
|
||||
comment = U.task_summary_comment(77)
|
||||
assert "cached" in comment
|
||||
|
||||
|
||||
def test_task_summary_handles_null_cache_creation():
|
||||
"""Pre-existing rows (NULL cache_creation) must not break aggregation."""
|
||||
rid = _new_run(agent="developer", task_id=88)
|
||||
conn = get_db()
|
||||
conn.execute(
|
||||
"UPDATE agent_runs SET input_tokens=100, cache_read_tokens=200, "
|
||||
"cache_creation_tokens=NULL, output_tokens=10, cost_usd=0.01 WHERE id=?",
|
||||
(rid,),
|
||||
)
|
||||
conn.commit()
|
||||
conn.close()
|
||||
s = U.task_usage_summary(88) # must not raise
|
||||
assert s["total_in"] == 300 # 100 + 200 + (NULL->0)
|
||||
assert s["total_cached"] == 200
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
import pytest
|
||||
import asyncio
|
||||
import os
|
||||
import tempfile
|
||||
from unittest.mock import patch, MagicMock, AsyncMock
|
||||
@@ -272,6 +273,46 @@ def test_gitea_ci_success_advances_to_review(mock_launcher, mock_ci):
|
||||
assert task["stage"] == "review"
|
||||
|
||||
|
||||
@patch("src.webhooks.gitea.notify_qg_failure")
|
||||
@patch("src.webhooks.gitea.launcher")
|
||||
def test_gitea_ci_failure_on_development_notifies_qg_failure(mock_launcher, mock_notify):
|
||||
"""BUG 6: CI failure at development is now the authoritative QG gate failing.
|
||||
|
||||
It must notify QG failure (not silently suppress) and must NOT advance the stage.
|
||||
"""
|
||||
conn = get_db()
|
||||
conn.execute(
|
||||
"INSERT INTO tasks (plane_id, work_item_id, repo, branch, stage) VALUES (?, ?, ?, ?, ?)",
|
||||
("ci-fail-001", "ET-011", "enduro-trails", "feature/ET-011-test", "development"),
|
||||
)
|
||||
conn.commit()
|
||||
conn.close()
|
||||
|
||||
resp = client.post(
|
||||
"/webhook/gitea",
|
||||
json={
|
||||
"state": "failure",
|
||||
"branches": [{"name": "feature/ET-011-test"}],
|
||||
"repository": {"name": "enduro-trails"},
|
||||
},
|
||||
headers={"X-Gitea-Event": "status"},
|
||||
)
|
||||
assert resp.status_code == 200
|
||||
|
||||
# QG failure was reported for the development stage with check_ci_green.
|
||||
assert mock_notify.called
|
||||
args, kwargs = mock_notify.call_args
|
||||
call = list(args) + list(kwargs.values())
|
||||
assert "development" in call
|
||||
assert "check_ci_green" in call
|
||||
|
||||
# Stage did NOT advance.
|
||||
conn = get_db()
|
||||
task = conn.execute("SELECT * FROM tasks WHERE plane_id = 'ci-fail-001'").fetchone()
|
||||
conn.close()
|
||||
assert task["stage"] == "development"
|
||||
|
||||
|
||||
def test_gitea_webhook_pr():
|
||||
"""PR event is accepted."""
|
||||
resp = client.post(
|
||||
@@ -301,3 +342,158 @@ def test_plane_webhook_event_logged():
|
||||
conn.close()
|
||||
assert event is not None
|
||||
assert event["source"] == "plane"
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# BUG 7: red CI on development must bounce the task back to the developer
|
||||
# (capped retries, symmetric to review REQUEST_CHANGES). These are pure-logic
|
||||
# tests: they invoke handle_ci_status() directly with mocked helpers so they do
|
||||
# not pass through the TestClient HMAC barrier (baseline 401s are off-limits).
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def _ci_failure_payload():
|
||||
return {
|
||||
"state": "failure",
|
||||
"branches": [{"name": "feature/ET-011-test"}],
|
||||
"repository": {"name": "enduro-trails"},
|
||||
}
|
||||
|
||||
|
||||
def _mock_db_with_retry_count(count):
|
||||
"""Build a get_db() mock whose retry_count query returns `count`."""
|
||||
conn = MagicMock()
|
||||
conn.execute.return_value.fetchone.return_value = {"cnt": count}
|
||||
return conn
|
||||
|
||||
|
||||
@patch("src.webhooks.gitea.notify_error")
|
||||
@patch("src.webhooks.gitea.notify_qg_failure")
|
||||
@patch("src.webhooks.gitea.enqueue_job")
|
||||
@patch("src.webhooks.gitea.update_task_stage")
|
||||
@patch("src.webhooks.gitea.get_db")
|
||||
@patch("src.webhooks.gitea.get_task_by_repo_branch")
|
||||
@patch("src.webhooks.gitea.get_project_by_repo")
|
||||
def test_ci_failure_development_retries_developer_under_limit(
|
||||
mock_proj, mock_task, mock_get_db, mock_update_stage,
|
||||
mock_enqueue, mock_qg, mock_err,
|
||||
):
|
||||
"""retry_count < MAX_DEV_RETRIES → relaunch developer, stage untouched."""
|
||||
from src.webhooks.gitea import handle_ci_status
|
||||
|
||||
mock_proj.return_value = {"repo": "enduro-trails"}
|
||||
mock_task.return_value = {
|
||||
"id": 1, "stage": "development", "work_item_id": "ET-011",
|
||||
}
|
||||
mock_get_db.return_value = _mock_db_with_retry_count(0)
|
||||
mock_enqueue.return_value = 42
|
||||
|
||||
asyncio.run(handle_ci_status(_ci_failure_payload()))
|
||||
|
||||
# QG failure was still reported (Slava sees both the failure and the retry).
|
||||
assert mock_qg.called
|
||||
# developer was re-enqueued.
|
||||
assert mock_enqueue.called
|
||||
assert mock_enqueue.call_args[0][0] == "developer"
|
||||
# No escalation.
|
||||
assert not mock_err.called
|
||||
# Stage stays on development — no update_task_stage in the CI-failure path.
|
||||
assert not mock_update_stage.called
|
||||
|
||||
|
||||
@patch("src.webhooks.gitea.notify_error")
|
||||
@patch("src.webhooks.gitea.notify_qg_failure")
|
||||
@patch("src.webhooks.gitea.enqueue_job")
|
||||
@patch("src.webhooks.gitea.update_task_stage")
|
||||
@patch("src.webhooks.gitea.get_db")
|
||||
@patch("src.webhooks.gitea.get_task_by_repo_branch")
|
||||
@patch("src.webhooks.gitea.get_project_by_repo")
|
||||
def test_ci_failure_development_escalates_at_limit(
|
||||
mock_proj, mock_task, mock_get_db, mock_update_stage,
|
||||
mock_enqueue, mock_qg, mock_err,
|
||||
):
|
||||
"""retry_count >= MAX_DEV_RETRIES → escalate via notify_error, no relaunch."""
|
||||
from src.webhooks.gitea import handle_ci_status, MAX_DEV_RETRIES
|
||||
|
||||
mock_proj.return_value = {"repo": "enduro-trails"}
|
||||
mock_task.return_value = {
|
||||
"id": 1, "stage": "development", "work_item_id": "ET-011",
|
||||
}
|
||||
mock_get_db.return_value = _mock_db_with_retry_count(MAX_DEV_RETRIES)
|
||||
|
||||
asyncio.run(handle_ci_status(_ci_failure_payload()))
|
||||
|
||||
# QG failure still reported.
|
||||
assert mock_qg.called
|
||||
# developer NOT re-enqueued at the cap.
|
||||
assert not mock_enqueue.called
|
||||
# Escalation message mentions CI failure.
|
||||
assert mock_err.called
|
||||
err_msg = " ".join(str(a) for a in mock_err.call_args[0])
|
||||
assert "Max developer retries" in err_msg
|
||||
assert "after CI failure" in err_msg
|
||||
# Stage untouched.
|
||||
assert not mock_update_stage.called
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# BUG 8 (second door): a merged-PR webhook must NOT fake-complete a task that is
|
||||
# still in the deploy stage. On `deploy` done is gated by the deployer's verdict
|
||||
# (check_deploy_status via advance_stage), not by the merge event. For every
|
||||
# other stage the merge->done behaviour is preserved. Pure-logic tests: invoke
|
||||
# handle_pr() directly with mocked helpers (no HMAC barrier).
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def _merged_pr_payload(branch="feature/ET-012-x"):
|
||||
return {
|
||||
"action": "closed",
|
||||
"pull_request": {
|
||||
"merged": True,
|
||||
"number": 7,
|
||||
"head": {"ref": branch},
|
||||
},
|
||||
"repository": {"name": "enduro-trails"},
|
||||
}
|
||||
|
||||
|
||||
@patch("src.webhooks.gitea.notify_stage_change")
|
||||
@patch("src.webhooks.gitea.update_task_stage")
|
||||
@patch("src.webhooks.gitea.get_task_by_repo_branch")
|
||||
@patch("src.webhooks.gitea.get_project_by_repo")
|
||||
def test_merge_on_deploy_stage_does_not_set_done(
|
||||
mock_proj, mock_task, mock_update_stage, mock_notify,
|
||||
):
|
||||
"""FIX 1: merge at deploy stage is ignored — done is gated by deployer verdict."""
|
||||
from src.webhooks.gitea import handle_pr
|
||||
|
||||
mock_proj.return_value = {"repo": "enduro-trails"}
|
||||
mock_task.return_value = {
|
||||
"id": 1, "stage": "deploy", "work_item_id": "ET-012",
|
||||
}
|
||||
|
||||
asyncio.run(handle_pr(_merged_pr_payload()))
|
||||
|
||||
# The merge-driven done path must NOT run on deploy.
|
||||
assert not mock_update_stage.called
|
||||
assert not mock_notify.called
|
||||
|
||||
|
||||
@patch("src.webhooks.gitea.notify_stage_change")
|
||||
@patch("src.webhooks.gitea.update_task_stage")
|
||||
@patch("src.webhooks.gitea.get_task_by_repo_branch")
|
||||
@patch("src.webhooks.gitea.get_project_by_repo")
|
||||
def test_merge_on_non_deploy_stage_sets_done(
|
||||
mock_proj, mock_task, mock_update_stage, mock_notify,
|
||||
):
|
||||
"""FIX 1: merge behaviour is preserved for non-deploy stages (e.g. review)."""
|
||||
from src.webhooks.gitea import handle_pr
|
||||
|
||||
mock_proj.return_value = {"repo": "enduro-trails"}
|
||||
mock_task.return_value = {
|
||||
"id": 2, "stage": "review", "work_item_id": "ET-013",
|
||||
}
|
||||
|
||||
asyncio.run(handle_pr(_merged_pr_payload(branch="feature/ET-013-x")))
|
||||
|
||||
# Non-deploy stages still get the merge-driven done.
|
||||
mock_update_stage.assert_called_once_with(2, "done")
|
||||
assert mock_notify.called
|
||||
|
||||
Reference in New Issue
Block a user