Compare commits
28 Commits
fix/pipeli
...
fix/deploy
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
4e4cc6c724 | ||
| b222d7af27 | |||
|
|
ec9aa74492 | ||
| 3e5c74ce4f | |||
|
|
9a0298de9d | ||
| 2801983d7b | |||
|
|
61e26a8930 | ||
| 2629dffe1b | |||
|
|
e4a9c48395 | ||
| a0621b9952 | |||
|
|
3a285de11d | ||
| 7922f6b67b | |||
|
|
e15d339b14 | ||
| 994f73a78e | |||
|
|
90c9ffe839 | ||
| b6aa107f93 | |||
|
|
0b8013cb06 | ||
| b01643fcc3 | |||
|
|
ca63bc26bb | ||
| dce9ac806b | |||
|
|
a9cdb17614 | ||
|
|
96c5e6b2f9 | ||
|
|
b91be74692 | ||
| 2d392b6fc7 | |||
|
|
857bad314c | ||
|
|
c4be50ee20 | ||
|
|
6b3e144949 | ||
| cd73c75cda |
@@ -699,12 +699,49 @@ class AgentLauncher:
|
|||||||
task_id, work_item_id = row[0], row[1]
|
task_id, work_item_id = row[0], row[1]
|
||||||
if not work_item_id:
|
if not work_item_id:
|
||||||
return
|
return
|
||||||
plane_add_comment(work_item_id, usage_comment(agent, usage), author=agent)
|
# Observability: every agent's finish comment links its artifact(s)
|
||||||
|
# (reviewer->12-review, tester->13-test-report, deployer->14-deploy-log,
|
||||||
|
# architect->ADR, developer->PR/branch). For the developer we resolve the
|
||||||
|
# open PR number so the link points straight at it.
|
||||||
|
pr_number = None
|
||||||
|
if agent == "developer":
|
||||||
|
pr_number = self._open_pr_number(repo, branch)
|
||||||
|
plane_add_comment(
|
||||||
|
work_item_id,
|
||||||
|
usage_comment(
|
||||||
|
agent,
|
||||||
|
usage,
|
||||||
|
repo=repo,
|
||||||
|
branch=branch,
|
||||||
|
work_item_id=work_item_id,
|
||||||
|
pr_number=pr_number,
|
||||||
|
),
|
||||||
|
author=agent,
|
||||||
|
)
|
||||||
if agent == "deployer":
|
if agent == "deployer":
|
||||||
plane_add_comment(
|
plane_add_comment(
|
||||||
work_item_id, task_summary_comment(task_id), author="deployer"
|
work_item_id, task_summary_comment(task_id), author="deployer"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
def _open_pr_number(self, repo: str, branch: str):
|
||||||
|
"""Return the open PR number for `branch`, or None. Never raises."""
|
||||||
|
try:
|
||||||
|
import httpx
|
||||||
|
owner = settings.gitea_owner
|
||||||
|
headers = {"Authorization": f"token {settings.gitea_token}"}
|
||||||
|
resp = httpx.get(
|
||||||
|
f"{settings.gitea_url}/api/v1/repos/{owner}/{repo}/pulls",
|
||||||
|
params={"state": "open", "head": branch},
|
||||||
|
headers=headers, timeout=5,
|
||||||
|
)
|
||||||
|
if resp.status_code == 200:
|
||||||
|
prs = resp.json()
|
||||||
|
if prs:
|
||||||
|
return prs[0].get("number")
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
return None
|
||||||
|
|
||||||
def _ensure_pr(self, repo: str, branch: str, run_id: int):
|
def _ensure_pr(self, repo: str, branch: str, run_id: int):
|
||||||
import httpx
|
import httpx
|
||||||
owner = settings.gitea_owner
|
owner = settings.gitea_owner
|
||||||
|
|||||||
@@ -22,6 +22,7 @@ class Settings(BaseSettings):
|
|||||||
|
|
||||||
# Gitea
|
# Gitea
|
||||||
gitea_url: str = "http://localhost:3000"
|
gitea_url: str = "http://localhost:3000"
|
||||||
|
gitea_public_url: str = "" # external URL for clickable links in comments; falls back to gitea_url
|
||||||
gitea_token: str = ""
|
gitea_token: str = ""
|
||||||
gitea_webhook_secret: str = ""
|
gitea_webhook_secret: str = ""
|
||||||
gitea_owner: str = "admin"
|
gitea_owner: str = "admin"
|
||||||
|
|||||||
107
src/db.py
107
src/db.py
@@ -83,7 +83,32 @@ def init_db():
|
|||||||
_ensure_column(conn, "agent_runs", "input_tokens", "INTEGER")
|
_ensure_column(conn, "agent_runs", "input_tokens", "INTEGER")
|
||||||
_ensure_column(conn, "agent_runs", "output_tokens", "INTEGER")
|
_ensure_column(conn, "agent_runs", "output_tokens", "INTEGER")
|
||||||
_ensure_column(conn, "agent_runs", "cache_read_tokens", "INTEGER")
|
_ensure_column(conn, "agent_runs", "cache_read_tokens", "INTEGER")
|
||||||
|
# Observability fix: also persist cache-CREATION input tokens. Claude CLI
|
||||||
|
# reports the real input split across input_tokens (fresh, ~tens) +
|
||||||
|
# cache_read_input_tokens (cache hit, millions) + cache_creation_input_tokens
|
||||||
|
# (writing new cache). Without this column the cache_creation slice is lost
|
||||||
|
# and the "X in" figure understates the true prompt size. Idempotent ALTER.
|
||||||
|
_ensure_column(conn, "agent_runs", "cache_creation_tokens", "INTEGER")
|
||||||
_ensure_column(conn, "agent_runs", "cost_usd", "REAL")
|
_ensure_column(conn, "agent_runs", "cost_usd", "REAL")
|
||||||
|
# Telegram live tracker (feat/telegram-live-tracker): persist the FULL model
|
||||||
|
# name (e.g. "tokenator/claude-opus-4-8") per agent_runs row so the tracker
|
||||||
|
# can render a short model tag per stage. Parsed from the run-log result JSON
|
||||||
|
# (modelUsage key) by the launcher monitor; NULL when unknown. Idempotent ALTER.
|
||||||
|
_ensure_column(conn, "agent_runs", "model", "TEXT")
|
||||||
|
# Telegram live tracker: one editable Telegram message per task. We store its
|
||||||
|
# message_id so each stage transition can editMessageText the same message
|
||||||
|
# instead of spamming a new one. Idempotent ALTER (safe on the live prod DB).
|
||||||
|
_ensure_column(conn, "tasks", "tracker_message_id", "INTEGER")
|
||||||
|
# Telegram live tracker: human-readable task title for the tracker header
|
||||||
|
# ("🛠️ ET-012 · <title>"). Populated from the Plane work-item name at task
|
||||||
|
# creation; falls back to the work_item_id when absent. Idempotent ALTER.
|
||||||
|
_ensure_column(conn, "tasks", "title", "TEXT")
|
||||||
|
# Telegram live tracker: "BRD review" is the only HUMAN gate time — the delta
|
||||||
|
# between "BRD ready / approve requested" and the analysis->architecture
|
||||||
|
# advance (human flipped Plane to Approved). Persisted on the task so the
|
||||||
|
# tracker can show "твоё время" without recomputing from activity history.
|
||||||
|
_ensure_column(conn, "tasks", "brd_review_started_at", "TEXT")
|
||||||
|
_ensure_column(conn, "tasks", "brd_review_ended_at", "TEXT")
|
||||||
conn.commit()
|
conn.commit()
|
||||||
conn.close()
|
conn.close()
|
||||||
|
|
||||||
@@ -131,6 +156,71 @@ def update_task_stage(task_id: int, stage: str):
|
|||||||
conn.close()
|
conn.close()
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# Telegram live tracker helpers (feat/telegram-live-tracker)
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
def get_tracker_message_id(task_id: int) -> int | None:
|
||||||
|
"""Return the stored Telegram tracker message_id for a task, or None."""
|
||||||
|
conn = get_db()
|
||||||
|
try:
|
||||||
|
row = conn.execute(
|
||||||
|
"SELECT tracker_message_id FROM tasks WHERE id=?", (task_id,)
|
||||||
|
).fetchone()
|
||||||
|
finally:
|
||||||
|
conn.close()
|
||||||
|
return row[0] if row and row[0] is not None else None
|
||||||
|
|
||||||
|
|
||||||
|
def set_tracker_message_id(task_id: int, message_id: int) -> None:
|
||||||
|
"""Persist the Telegram tracker message_id for a task (idempotent overwrite)."""
|
||||||
|
conn = get_db()
|
||||||
|
try:
|
||||||
|
conn.execute(
|
||||||
|
"UPDATE tasks SET tracker_message_id=? WHERE id=?",
|
||||||
|
(message_id, task_id),
|
||||||
|
)
|
||||||
|
conn.commit()
|
||||||
|
finally:
|
||||||
|
conn.close()
|
||||||
|
|
||||||
|
|
||||||
|
def mark_brd_review_started(task_id: int) -> None:
|
||||||
|
"""Stamp when BRD review (the human approve gate) started, if not already set.
|
||||||
|
|
||||||
|
Idempotent: only sets it the first time (a retried analyst run must not reset
|
||||||
|
the clock). The delta to brd_review_ended_at is the only "твоё время".
|
||||||
|
"""
|
||||||
|
conn = get_db()
|
||||||
|
try:
|
||||||
|
conn.execute(
|
||||||
|
"UPDATE tasks SET brd_review_started_at=datetime('now') "
|
||||||
|
"WHERE id=? AND brd_review_started_at IS NULL",
|
||||||
|
(task_id,),
|
||||||
|
)
|
||||||
|
conn.commit()
|
||||||
|
finally:
|
||||||
|
conn.close()
|
||||||
|
|
||||||
|
|
||||||
|
def mark_brd_review_ended(task_id: int) -> None:
|
||||||
|
"""Stamp when BRD review ended (analysis->architecture advance / Approved).
|
||||||
|
|
||||||
|
Idempotent: only sets it the first time and only if a start exists.
|
||||||
|
"""
|
||||||
|
conn = get_db()
|
||||||
|
try:
|
||||||
|
conn.execute(
|
||||||
|
"UPDATE tasks SET brd_review_ended_at=datetime('now') "
|
||||||
|
"WHERE id=? AND brd_review_started_at IS NOT NULL "
|
||||||
|
"AND brd_review_ended_at IS NULL",
|
||||||
|
(task_id,),
|
||||||
|
)
|
||||||
|
conn.commit()
|
||||||
|
finally:
|
||||||
|
conn.close()
|
||||||
|
|
||||||
|
|
||||||
def get_next_work_item_id(repo: str, prefix: str = "ET") -> str:
|
def get_next_work_item_id(repo: str, prefix: str = "ET") -> str:
|
||||||
"""Generate next work item ID (e.g., ET-003 / ORCH-001).
|
"""Generate next work item ID (e.g., ET-003 / ORCH-001).
|
||||||
|
|
||||||
@@ -351,6 +441,23 @@ def mark_job(
|
|||||||
conn.close()
|
conn.close()
|
||||||
|
|
||||||
|
|
||||||
|
def has_active_job_for_task(task_id: int) -> bool:
|
||||||
|
"""True if the task already has a queued or running job.
|
||||||
|
|
||||||
|
Used by the status-only verdict model (handle_status_start) to guard against
|
||||||
|
double-launching an agent when a duplicate In Progress webhook arrives or a
|
||||||
|
job is still in flight. The events de-dup absorbs identical webhook bodies;
|
||||||
|
this guards against distinct webhooks while a job is pending/running.
|
||||||
|
"""
|
||||||
|
conn = get_db()
|
||||||
|
row = conn.execute(
|
||||||
|
"SELECT 1 FROM jobs WHERE task_id = ? AND status IN ('queued','running') LIMIT 1",
|
||||||
|
(task_id,),
|
||||||
|
).fetchone()
|
||||||
|
conn.close()
|
||||||
|
return row is not None
|
||||||
|
|
||||||
|
|
||||||
def count_running_jobs() -> int:
|
def count_running_jobs() -> int:
|
||||||
"""Number of jobs currently in 'running' status (for max_concurrency)."""
|
"""Number of jobs currently in 'running' status (for max_concurrency)."""
|
||||||
conn = get_db()
|
conn = get_db()
|
||||||
|
|||||||
@@ -1,6 +1,24 @@
|
|||||||
"""Notifications and logging for orchestrator events."""
|
"""Notifications and logging for orchestrator events.
|
||||||
|
|
||||||
|
feat/telegram-live-tracker (Variant B+): instead of ~15 separate Telegram
|
||||||
|
messages per task (agent start / finish / stage transition / QG-pending / tech
|
||||||
|
noise), the orchestrator now maintains ONE live tracker message per task that is
|
||||||
|
edited in place (editMessageText) on every stage transition. Only events that
|
||||||
|
NEED Slava's attention are sent as SEPARATE, notifying messages:
|
||||||
|
|
||||||
|
* approve-gate (notify_approve_requested) — BRD/TZ/AC ready, flip to Approved
|
||||||
|
* deploy failed / rolled back — send_telegram from launcher/engine
|
||||||
|
* agent failed (exit_code != 0) — send_telegram from launcher
|
||||||
|
* task error (notify_error)
|
||||||
|
|
||||||
|
The tracker itself is edited SILENTLY (disable_notification: true). Stage-change,
|
||||||
|
agent-start, agent-finish and QG-pending no longer emit their own messages — they
|
||||||
|
just refresh the tracker (or are log-only).
|
||||||
|
"""
|
||||||
|
|
||||||
|
import html
|
||||||
import logging
|
import logging
|
||||||
|
|
||||||
import httpx
|
import httpx
|
||||||
|
|
||||||
logger = logging.getLogger("orchestrator")
|
logger = logging.getLogger("orchestrator")
|
||||||
@@ -17,25 +35,115 @@ def _get_settings():
|
|||||||
return _settings
|
return _settings
|
||||||
|
|
||||||
|
|
||||||
def send_telegram(text: str):
|
# --------------------------------------------------------------------------- #
|
||||||
"""Send notification to Telegram. Fire-and-forget, never raises."""
|
# Low-level Telegram primitives
|
||||||
|
# --------------------------------------------------------------------------- #
|
||||||
|
|
||||||
|
def send_telegram(text: str, disable_notification: bool = False):
|
||||||
|
"""Send a notification to Telegram. Fire-and-forget, never raises.
|
||||||
|
|
||||||
|
Returns the Telegram message_id on success, else None (so callers that want
|
||||||
|
to track the message — the tracker — can store it; legacy callers ignore it).
|
||||||
|
"""
|
||||||
s = _get_settings()
|
s = _get_settings()
|
||||||
if not s.telegram_bot_token or not s.telegram_chat_id:
|
if not s.telegram_bot_token or not s.telegram_chat_id:
|
||||||
return
|
return None
|
||||||
try:
|
try:
|
||||||
url = f"https://api.telegram.org/bot{s.telegram_bot_token}/sendMessage"
|
url = f"https://api.telegram.org/bot{s.telegram_bot_token}/sendMessage"
|
||||||
httpx.post(
|
resp = httpx.post(
|
||||||
url,
|
url,
|
||||||
json={
|
json={
|
||||||
"chat_id": s.telegram_chat_id,
|
"chat_id": s.telegram_chat_id,
|
||||||
"text": text,
|
"text": text,
|
||||||
"parse_mode": "HTML",
|
"parse_mode": "HTML",
|
||||||
"disable_notification": False,
|
"disable_notification": disable_notification,
|
||||||
},
|
},
|
||||||
timeout=5,
|
timeout=5,
|
||||||
)
|
)
|
||||||
|
data = resp.json()
|
||||||
|
if data.get("ok"):
|
||||||
|
return data["result"]["message_id"]
|
||||||
except Exception:
|
except Exception:
|
||||||
pass # Never crash orchestrator due to notification failure
|
pass # Never crash orchestrator due to notification failure
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
# edit_telegram outcome codes -> let update_task_tracker decide what to do:
|
||||||
|
# "ok" edit applied -> nothing else to do
|
||||||
|
# "not_modified" Telegram says text is identical (400 "message is not
|
||||||
|
# modified" / "exactly the same") -> success, NO new message
|
||||||
|
# "gone" original message can't be edited (deleted / too old /
|
||||||
|
# invalid id) -> caller must fall back to a NEW message
|
||||||
|
# "failed" transient failure (network / timeout / 5xx / unknown 400)
|
||||||
|
# -> caller must NOT send a new message (avoid duplicates)
|
||||||
|
EDIT_OK = "ok"
|
||||||
|
EDIT_NOT_MODIFIED = "not_modified"
|
||||||
|
EDIT_GONE = "gone"
|
||||||
|
EDIT_FAILED = "failed"
|
||||||
|
|
||||||
|
# Telegram error descriptions that mean the message is permanently un-editable
|
||||||
|
# (it is gone / orphaned) -> fall back to a fresh message.
|
||||||
|
_GONE_MARKERS = (
|
||||||
|
"message to edit not found",
|
||||||
|
"message can't be edited",
|
||||||
|
"message_id_invalid",
|
||||||
|
)
|
||||||
|
# Telegram "nothing changed" -> treat as success, never a duplicate.
|
||||||
|
_NOT_MODIFIED_MARKERS = (
|
||||||
|
"message is not modified",
|
||||||
|
"exactly the same",
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def edit_telegram(message_id: int, text: str) -> str:
|
||||||
|
"""Edit an existing Telegram message. Never raises.
|
||||||
|
|
||||||
|
Returns a distinguishable outcome (see EDIT_* constants) so the caller can
|
||||||
|
tell apart "all good" / "nothing changed" / "message gone" / "transient
|
||||||
|
failure" and only fall back to a NEW message when the original is truly gone.
|
||||||
|
"""
|
||||||
|
s = _get_settings()
|
||||||
|
if not s.telegram_bot_token or not s.telegram_chat_id:
|
||||||
|
return EDIT_FAILED
|
||||||
|
try:
|
||||||
|
url = f"https://api.telegram.org/bot{s.telegram_bot_token}/editMessageText"
|
||||||
|
resp = httpx.post(
|
||||||
|
url,
|
||||||
|
json={
|
||||||
|
"chat_id": s.telegram_chat_id,
|
||||||
|
"message_id": message_id,
|
||||||
|
"text": text,
|
||||||
|
"parse_mode": "HTML",
|
||||||
|
},
|
||||||
|
timeout=5,
|
||||||
|
)
|
||||||
|
data = resp.json()
|
||||||
|
if data.get("ok"):
|
||||||
|
return EDIT_OK
|
||||||
|
# ok:false -> inspect the description to classify the 400.
|
||||||
|
desc = str(data.get("description") or "").lower()
|
||||||
|
if any(m in desc for m in _NOT_MODIFIED_MARKERS):
|
||||||
|
# Text is identical between transitions (e.g. repeat review cycle
|
||||||
|
# renders the same line). Nothing to do, NOT a duplicate.
|
||||||
|
logger.debug(
|
||||||
|
f"edit_telegram(mid={message_id}): not modified, skipping"
|
||||||
|
)
|
||||||
|
return EDIT_NOT_MODIFIED
|
||||||
|
if any(m in desc for m in _GONE_MARKERS):
|
||||||
|
logger.warning(
|
||||||
|
f"edit_telegram(mid={message_id}): message gone ({desc!r}), "
|
||||||
|
f"will fall back to a new message"
|
||||||
|
)
|
||||||
|
return EDIT_GONE
|
||||||
|
# Unknown 400 / other non-ok -> transient/unknown, do NOT duplicate.
|
||||||
|
logger.warning(
|
||||||
|
f"edit_telegram(mid={message_id}): edit failed ({desc!r})"
|
||||||
|
)
|
||||||
|
return EDIT_FAILED
|
||||||
|
except Exception as e:
|
||||||
|
# Network / timeout / 5xx -> transient, do NOT duplicate.
|
||||||
|
logger.warning(f"edit_telegram(mid={message_id}): transient error: {e}")
|
||||||
|
return EDIT_FAILED
|
||||||
|
|
||||||
|
|
||||||
def _get_work_item_id(task_id: int) -> str:
|
def _get_work_item_id(task_id: int) -> str:
|
||||||
@@ -50,26 +158,355 @@ def _get_work_item_id(task_id: int) -> str:
|
|||||||
return f"task-{task_id}"
|
return f"task-{task_id}"
|
||||||
|
|
||||||
|
|
||||||
|
# --------------------------------------------------------------------------- #
|
||||||
|
# Live task tracker
|
||||||
|
# --------------------------------------------------------------------------- #
|
||||||
|
|
||||||
|
# Pipeline stages shown in the tracker, in order, with their display label and
|
||||||
|
# the agent whose agent_runs rows describe that stage's work. "Ревью БРД" is NOT
|
||||||
|
# an agent stage — it is the human approve gate rendered between Analysis and
|
||||||
|
# Architecture from the task's brd_review_* timestamps.
|
||||||
|
_TRACKER_STAGES = [
|
||||||
|
("analysis", "Analysis", "analyst"),
|
||||||
|
("architecture", "Architecture", "architect"),
|
||||||
|
("development", "Development", "developer"),
|
||||||
|
("review", "Review", "reviewer"),
|
||||||
|
("testing", "Testing", "tester"),
|
||||||
|
("deploy", "Deploy", "deployer"),
|
||||||
|
]
|
||||||
|
|
||||||
|
# Map a pipeline stage -> the agent that is RUNNING while the task sits in it.
|
||||||
|
# (development is entered after architecture finishes, etc.) Used to render the
|
||||||
|
# "🔄 <Stage> … идёт" line for the currently-active stage.
|
||||||
|
_BRD_LABEL = "\u0420\u0435\u0432\u044c\u044e \u0411\u0420\u0414" # "Ревью БРД"
|
||||||
|
|
||||||
|
_STAGE_ACTIVE_AGENT = {
|
||||||
|
"analysis": "analyst",
|
||||||
|
"architecture": "architect",
|
||||||
|
"development": "developer",
|
||||||
|
"review": "reviewer",
|
||||||
|
"testing": "tester",
|
||||||
|
"deploy": "deployer",
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def _fmt_minutes(seconds) -> str:
|
||||||
|
"""Render a duration in whole minutes: 0..59s -> '<1м', else '<n>м'."""
|
||||||
|
try:
|
||||||
|
seconds = int(seconds or 0)
|
||||||
|
except (TypeError, ValueError):
|
||||||
|
seconds = 0
|
||||||
|
if seconds <= 0:
|
||||||
|
return "0м"
|
||||||
|
if seconds < 60:
|
||||||
|
return "<1м"
|
||||||
|
return f"{seconds // 60}\u043c"
|
||||||
|
|
||||||
|
|
||||||
|
def _parse_sql_ts(ts):
|
||||||
|
"""Parse a SQLite 'YYYY-MM-DD HH:MM:SS' UTC timestamp -> aware datetime/None."""
|
||||||
|
if not ts:
|
||||||
|
return None
|
||||||
|
from datetime import datetime, timezone
|
||||||
|
for fmt in ("%Y-%m-%d %H:%M:%S", "%Y-%m-%dT%H:%M:%S"):
|
||||||
|
try:
|
||||||
|
return datetime.strptime(str(ts)[:19], fmt).replace(tzinfo=timezone.utc)
|
||||||
|
except (ValueError, TypeError):
|
||||||
|
continue
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
def _duration_seconds(started, finished):
|
||||||
|
"""Seconds between two SQL timestamps; None if either is missing/unparseable."""
|
||||||
|
a = _parse_sql_ts(started)
|
||||||
|
b = _parse_sql_ts(finished)
|
||||||
|
if a is None or b is None:
|
||||||
|
return None
|
||||||
|
return max(int((b - a).total_seconds()), 0)
|
||||||
|
|
||||||
|
|
||||||
|
def render_task_tracker(task_id: int) -> str:
|
||||||
|
"""Build the full live-tracker text for a task from the DB (stateless render).
|
||||||
|
|
||||||
|
Pulls the task header (work_item_id, title, stage), every agent_runs row, and
|
||||||
|
the BRD-review timestamps, then renders:
|
||||||
|
- one '✅ <Stage> <dur> · <in>↓/<out>↑ · <cost> · <model>' line per finished
|
||||||
|
stage (latest run per stage),
|
||||||
|
- the '⏸️ Ревью БРД <dur> · твоё время[ ⏳]' line between Analysis/Architecture,
|
||||||
|
- a '🔄 <Stage> … идёт' line for the active (in-progress) stage,
|
||||||
|
- the '💰 <in>↓ / <out>↑ · <cost>' totals,
|
||||||
|
- on done: '⏱️ Всего .. · агенты .. · твоё ..' and a '🔗 PR / 📦' line.
|
||||||
|
|
||||||
|
Never raises (returns a minimal fallback string on error).
|
||||||
|
"""
|
||||||
|
from .db import get_db
|
||||||
|
from .usage import fmt_tokens, fmt_cost, _input_total, short_model_name
|
||||||
|
|
||||||
|
try:
|
||||||
|
conn = get_db()
|
||||||
|
task = conn.execute(
|
||||||
|
"SELECT id, work_item_id, title, stage, created_at, updated_at, "
|
||||||
|
"brd_review_started_at, brd_review_ended_at "
|
||||||
|
"FROM tasks WHERE id=?",
|
||||||
|
(task_id,),
|
||||||
|
).fetchone()
|
||||||
|
if not task:
|
||||||
|
conn.close()
|
||||||
|
return f"task-{task_id}"
|
||||||
|
runs = conn.execute(
|
||||||
|
"SELECT agent, started_at, finished_at, exit_code, input_tokens, "
|
||||||
|
"output_tokens, cache_read_tokens, cache_creation_tokens, cost_usd, model "
|
||||||
|
"FROM agent_runs WHERE task_id=? ORDER BY id ASC",
|
||||||
|
(task_id,),
|
||||||
|
).fetchall()
|
||||||
|
conn.close()
|
||||||
|
except Exception as e:
|
||||||
|
logger.warning(f"render_task_tracker({task_id}) DB error: {e}")
|
||||||
|
return f"task-{task_id}"
|
||||||
|
|
||||||
|
work_item_id = task["work_item_id"] or f"task-{task_id}"
|
||||||
|
title = task["title"] or work_item_id
|
||||||
|
stage = task["stage"] or "created"
|
||||||
|
done = stage == "done"
|
||||||
|
|
||||||
|
# Latest completed run per agent (a stage may have multiple runs on retry;
|
||||||
|
# we show the most recent FINISHED, successful run for the stage line).
|
||||||
|
last_done = {}
|
||||||
|
agent_runs_by_agent = {}
|
||||||
|
for r in runs:
|
||||||
|
agent_runs_by_agent.setdefault(r["agent"], []).append(r)
|
||||||
|
if r["finished_at"] and (r["exit_code"] == 0 or r["exit_code"] is None):
|
||||||
|
last_done[r["agent"]] = r
|
||||||
|
|
||||||
|
# Totals across ALL runs (every input/output token + cost counts).
|
||||||
|
total_in = 0
|
||||||
|
total_out = 0
|
||||||
|
total_cost = 0.0
|
||||||
|
agent_seconds = 0
|
||||||
|
for r in runs:
|
||||||
|
usage = {
|
||||||
|
"input_tokens": r["input_tokens"],
|
||||||
|
"cache_read_tokens": r["cache_read_tokens"],
|
||||||
|
"cache_creation_tokens": r["cache_creation_tokens"],
|
||||||
|
}
|
||||||
|
total_in += _input_total(usage)
|
||||||
|
total_out += int(r["output_tokens"] or 0)
|
||||||
|
total_cost += float(r["cost_usd"] or 0.0)
|
||||||
|
d = _duration_seconds(r["started_at"], r["finished_at"])
|
||||||
|
if d is not None:
|
||||||
|
agent_seconds += d
|
||||||
|
|
||||||
|
esc_title = html.escape(title)
|
||||||
|
header = (
|
||||||
|
f"\U0001f389 {html.escape(work_item_id)} \u00b7 {esc_title} \u2014 \u0413\u041e\u0422\u041e\u0412\u041e"
|
||||||
|
if done
|
||||||
|
else f"\U0001f6e0\ufe0f {html.escape(work_item_id)} \u00b7 {esc_title}"
|
||||||
|
)
|
||||||
|
bar = "\u2501" * 22
|
||||||
|
lines = [header, bar]
|
||||||
|
|
||||||
|
def _stage_line(label, run):
|
||||||
|
usage = {
|
||||||
|
"input_tokens": run["input_tokens"],
|
||||||
|
"cache_read_tokens": run["cache_read_tokens"],
|
||||||
|
"cache_creation_tokens": run["cache_creation_tokens"],
|
||||||
|
}
|
||||||
|
in_tok = fmt_tokens(_input_total(usage))
|
||||||
|
out_tok = fmt_tokens(run["output_tokens"])
|
||||||
|
cost = fmt_cost(run["cost_usd"])
|
||||||
|
dur = _fmt_minutes(_duration_seconds(run["started_at"], run["finished_at"]))
|
||||||
|
model = short_model_name(run["model"])
|
||||||
|
model_suffix = f" \u00b7 {model}" if model else ""
|
||||||
|
return (
|
||||||
|
f"\u2705 {label:<13} {dur} \u00b7 "
|
||||||
|
f"{in_tok}\u2193/{out_tok}\u2191 \u00b7 {cost}{model_suffix}"
|
||||||
|
)
|
||||||
|
|
||||||
|
# BRD review line: between Analysis and Architecture, only once Analysis has
|
||||||
|
# produced a run (i.e. the gate is live). Time = human review delta.
|
||||||
|
brd_started = task["brd_review_started_at"]
|
||||||
|
brd_ended = task["brd_review_ended_at"]
|
||||||
|
review_seconds = _duration_seconds(brd_started, brd_ended)
|
||||||
|
|
||||||
|
for stage_key, label, agent in _TRACKER_STAGES:
|
||||||
|
run = last_done.get(agent)
|
||||||
|
# The stage is "in progress" only when it is the task's current stage AND
|
||||||
|
# there is an unfinished run for its agent (the agent is actually still
|
||||||
|
# working). A finished run with no in-flight run -> show the \u2705 result,
|
||||||
|
# even if the task still sits in that stage (just-finished snapshot).
|
||||||
|
agent_runs = agent_runs_by_agent.get(agent, [])
|
||||||
|
has_inflight = any(ar["finished_at"] is None for ar in agent_runs)
|
||||||
|
is_active_stage = (
|
||||||
|
_STAGE_ACTIVE_AGENT.get(stage) == agent
|
||||||
|
and stage == stage_key
|
||||||
|
and (has_inflight or run is None)
|
||||||
|
)
|
||||||
|
if is_active_stage:
|
||||||
|
# Live "\U0001f504 ... \u0438\u0434\u0451\u0442" line. Count how many times THIS stage's
|
||||||
|
# agent has run for this task; a 2nd+ run means we're re-doing the
|
||||||
|
# stage (e.g. review->development->review), so show "\u043f\u043e\u043f\u044b\u0442\u043a\u0430 N"
|
||||||
|
# to make the text change between cycles and to honestly show Slava
|
||||||
|
# the stage is being re-worked.
|
||||||
|
attempt = len(agent_runs)
|
||||||
|
if attempt >= 2:
|
||||||
|
lines.append(
|
||||||
|
f"\U0001f504 {label} \u00b7 \u043f\u043e\u043f\u044b\u0442\u043a\u0430 {attempt} "
|
||||||
|
f"\u2026 \u0438\u0434\u0451\u0442"
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
lines.append(
|
||||||
|
f"\U0001f504 {label:<13} \u2026 \u00b7 \u0438\u0434\u0451\u0442"
|
||||||
|
)
|
||||||
|
elif run is not None:
|
||||||
|
lines.append(_stage_line(label, run))
|
||||||
|
# else: not started yet -> not shown.
|
||||||
|
|
||||||
|
# Insert the BRD review line right after Analysis.
|
||||||
|
if stage_key == "analysis" and brd_started:
|
||||||
|
brd_label = f"{_BRD_LABEL:<13}"
|
||||||
|
if review_seconds is not None:
|
||||||
|
dur = _fmt_minutes(review_seconds)
|
||||||
|
lines.append(
|
||||||
|
f"\u23f8\ufe0f {brd_label} {dur} \u00b7 \u0442\u0432\u043e\u0451 \u0432\u0440\u0435\u043c\u044f"
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
# Still waiting on the human (ended not stamped yet).
|
||||||
|
from datetime import datetime, timezone
|
||||||
|
start_dt = _parse_sql_ts(brd_started)
|
||||||
|
waited = None
|
||||||
|
if start_dt is not None:
|
||||||
|
waited = int(
|
||||||
|
(datetime.now(timezone.utc) - start_dt).total_seconds()
|
||||||
|
)
|
||||||
|
dur = _fmt_minutes(waited) if waited is not None else "\u2026"
|
||||||
|
lines.append(
|
||||||
|
f"\u23f8\ufe0f {brd_label} {dur} \u00b7 \u0442\u0432\u043e\u0451 \u0432\u0440\u0435\u043c\u044f \u23f3"
|
||||||
|
)
|
||||||
|
|
||||||
|
lines.append(bar)
|
||||||
|
lines.append(
|
||||||
|
f"\U0001f4b0 {fmt_tokens(total_in)}\u2193 / {fmt_tokens(total_out)}\u2191 \u00b7 "
|
||||||
|
f"{fmt_cost(total_cost)}"
|
||||||
|
)
|
||||||
|
|
||||||
|
if done:
|
||||||
|
wall = _duration_seconds(task["created_at"], task["updated_at"])
|
||||||
|
wall_str = _fmt_minutes(wall) if wall is not None else "?"
|
||||||
|
review_str = _fmt_minutes(review_seconds) if review_seconds else "0м"
|
||||||
|
lines.append(
|
||||||
|
f"\u23f1\ufe0f \u0412\u0441\u0435\u0433\u043e {wall_str} \u00b7 "
|
||||||
|
f"\u0430\u0433\u0435\u043d\u0442\u044b {_fmt_minutes(agent_seconds)} \u00b7 "
|
||||||
|
f"\u0442\u0432\u043e\u0451 {review_str}"
|
||||||
|
)
|
||||||
|
link = _done_link(task_id, task["work_item_id"])
|
||||||
|
if link:
|
||||||
|
lines.append(link)
|
||||||
|
|
||||||
|
return "\n".join(lines)
|
||||||
|
|
||||||
|
|
||||||
|
def _done_link(task_id: int, work_item_id) -> str | None:
|
||||||
|
"""Build the final '🔗 PR #n · 📦 deployed' line. Never raises -> None."""
|
||||||
|
try:
|
||||||
|
from .config import settings
|
||||||
|
from .db import get_db
|
||||||
|
conn = get_db()
|
||||||
|
row = conn.execute(
|
||||||
|
"SELECT repo, branch FROM tasks WHERE id=?", (task_id,)
|
||||||
|
).fetchone()
|
||||||
|
conn.close()
|
||||||
|
if not row:
|
||||||
|
return None
|
||||||
|
repo, branch = row["repo"], row["branch"]
|
||||||
|
pr_part = None
|
||||||
|
try:
|
||||||
|
owner = settings.gitea_owner
|
||||||
|
headers = {"Authorization": f"token {settings.gitea_token}"}
|
||||||
|
resp = httpx.get(
|
||||||
|
f"{settings.gitea_url}/api/v1/repos/{owner}/{repo}/pulls",
|
||||||
|
params={"state": "all", "head": branch},
|
||||||
|
headers=headers, timeout=5,
|
||||||
|
)
|
||||||
|
if resp.status_code == 200:
|
||||||
|
prs = resp.json()
|
||||||
|
if prs:
|
||||||
|
pr_part = f"\U0001f517 PR #{prs[0].get('number')}"
|
||||||
|
except Exception:
|
||||||
|
pr_part = None
|
||||||
|
parts = []
|
||||||
|
if pr_part:
|
||||||
|
parts.append(pr_part)
|
||||||
|
parts.append("\U0001f4e6 deployed")
|
||||||
|
return " \u00b7 ".join(parts)
|
||||||
|
except Exception:
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
def update_task_tracker(task_id: int):
|
||||||
|
"""Render + push the live tracker for a task. Never raises.
|
||||||
|
|
||||||
|
First call (no stored tracker_message_id): sendMessage (silent) and store the
|
||||||
|
returned message_id. Subsequent calls: editMessageText the stored message.
|
||||||
|
A NEW message is sent ONLY when the original is truly gone (deleted / too old
|
||||||
|
/ invalid id). On "not modified" (text unchanged) or transient failures
|
||||||
|
(network / timeout / 5xx / unknown 400) we do NOT send a new message — that
|
||||||
|
is exactly what produced duplicate trackers and orphaned (lagging) messages.
|
||||||
|
The tracker is always sent with disable_notification so it never pings —
|
||||||
|
only the dedicated alert helpers ping.
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
from .db import get_tracker_message_id, set_tracker_message_id
|
||||||
|
text = render_task_tracker(task_id)
|
||||||
|
mid = get_tracker_message_id(task_id)
|
||||||
|
if mid is not None:
|
||||||
|
result = edit_telegram(mid, text)
|
||||||
|
if result in (EDIT_OK, EDIT_NOT_MODIFIED):
|
||||||
|
# Edited in place (or nothing to change) -> done, no duplicate.
|
||||||
|
return
|
||||||
|
if result == EDIT_FAILED:
|
||||||
|
# Transient -> don't duplicate; tracker redraws next transition.
|
||||||
|
logger.debug(
|
||||||
|
f"update_task_tracker({task_id}): edit failed transiently, "
|
||||||
|
f"keeping message {mid}"
|
||||||
|
)
|
||||||
|
return
|
||||||
|
# result == EDIT_GONE -> the stored message is gone; fall through
|
||||||
|
# to send a fresh one and re-point tracker_message_id at it.
|
||||||
|
new_mid = send_telegram(text, disable_notification=True)
|
||||||
|
if new_mid is not None:
|
||||||
|
set_tracker_message_id(task_id, new_mid)
|
||||||
|
except Exception as e:
|
||||||
|
logger.warning(f"update_task_tracker({task_id}) failed: {e}")
|
||||||
|
|
||||||
|
|
||||||
|
# --------------------------------------------------------------------------- #
|
||||||
|
# Stage / agent lifecycle notifications (now tracker-only, no separate message)
|
||||||
|
# --------------------------------------------------------------------------- #
|
||||||
|
|
||||||
def notify_stage_change(task_id: int, old_stage: str, new_stage: str, agent: str = None):
|
def notify_stage_change(task_id: int, old_stage: str, new_stage: str, agent: str = None):
|
||||||
"""Log and notify stage transition."""
|
"""Log a stage transition and refresh the live tracker (no separate message)."""
|
||||||
work_item_id = _get_work_item_id(task_id)
|
work_item_id = _get_work_item_id(task_id)
|
||||||
msg = f"\U0001f504 {work_item_id}: {old_stage} \u2192 {new_stage}"
|
msg = f"\U0001f504 {work_item_id}: {old_stage} \u2192 {new_stage}"
|
||||||
if agent:
|
if agent:
|
||||||
msg += f" (\u0437\u0430\u043f\u0443\u0449\u0435\u043d {agent})"
|
msg += f" (\u0437\u0430\u043f\u0443\u0449\u0435\u043d {agent})"
|
||||||
logger.info(msg)
|
logger.info(msg)
|
||||||
send_telegram(msg)
|
update_task_tracker(task_id)
|
||||||
|
|
||||||
|
|
||||||
def notify_agent_started(run_id: int, agent: str, task_id: int):
|
def notify_agent_started(run_id: int, agent: str, task_id: int):
|
||||||
"""Notify agent launch."""
|
"""Log an agent launch and refresh the tracker (no separate message)."""
|
||||||
work_item_id = _get_work_item_id(task_id)
|
work_item_id = _get_work_item_id(task_id)
|
||||||
msg = f"\U0001f680 {work_item_id}: {agent} \u0437\u0430\u043f\u0443\u0449\u0435\u043d (run_id={run_id})"
|
logger.info(f"\U0001f680 {work_item_id}: {agent} \u0437\u0430\u043f\u0443\u0449\u0435\u043d (run_id={run_id})")
|
||||||
logger.info(msg)
|
if task_id:
|
||||||
send_telegram(msg)
|
update_task_tracker(task_id)
|
||||||
|
|
||||||
|
|
||||||
def notify_agent_finished(run_id: int, agent: str, exit_code: int, task_id: int = None, duration_s: int = None):
|
def notify_agent_finished(run_id: int, agent: str, exit_code: int, task_id: int = None, duration_s: int = None):
|
||||||
"""Notify agent completion."""
|
"""Log agent completion and refresh the tracker (no separate message).
|
||||||
|
|
||||||
|
The agent-FAILED alert (exit_code != 0) is still sent separately by the
|
||||||
|
launcher via send_telegram; this helper itself only logs + refreshes.
|
||||||
|
"""
|
||||||
work_item_id = _get_work_item_id(task_id) if task_id else "?"
|
work_item_id = _get_work_item_id(task_id) if task_id else "?"
|
||||||
if exit_code == 0:
|
if exit_code == 0:
|
||||||
dur = f" ({duration_s // 60} \u043c\u0438\u043d)" if duration_s else ""
|
dur = f" ({duration_s // 60} \u043c\u0438\u043d)" if duration_s else ""
|
||||||
@@ -79,47 +516,66 @@ def notify_agent_finished(run_id: int, agent: str, exit_code: int, task_id: int
|
|||||||
else:
|
else:
|
||||||
msg = f"\u274c {work_item_id}: {agent} \u0443\u043f\u0430\u043b (exit_code={exit_code})"
|
msg = f"\u274c {work_item_id}: {agent} \u0443\u043f\u0430\u043b (exit_code={exit_code})"
|
||||||
logger.info(msg)
|
logger.info(msg)
|
||||||
send_telegram(msg)
|
if task_id:
|
||||||
|
update_task_tracker(task_id)
|
||||||
|
|
||||||
|
|
||||||
def notify_qg_result(task_id: int, check: str, passed: bool, reason: str = None):
|
def notify_qg_result(task_id: int, check: str, passed: bool, reason: str = None):
|
||||||
"""Notify QG check result."""
|
"""Log a QG check result (NO separate Telegram message: QG-pending is noise).
|
||||||
|
|
||||||
|
Kept for callers; QG outcomes are log-only now and reflected by the tracker
|
||||||
|
through the resulting stage transition.
|
||||||
|
"""
|
||||||
work_item_id = _get_work_item_id(task_id)
|
work_item_id = _get_work_item_id(task_id)
|
||||||
if passed:
|
if passed:
|
||||||
msg = f"\u2705 {work_item_id}: QG {check} \u2014 passed"
|
logger.info(f"\u2705 {work_item_id}: QG {check} \u2014 passed")
|
||||||
else:
|
else:
|
||||||
msg = f"\u26a0\ufe0f {work_item_id}: QG {check} \u2014 failed: {reason}"
|
logger.warning(f"\u26a0\ufe0f {work_item_id}: QG {check} \u2014 failed: {reason}")
|
||||||
logger.info(msg)
|
|
||||||
send_telegram(msg)
|
|
||||||
|
|
||||||
|
|
||||||
def notify_qg_failure(task_id: int, stage: str, check: str, reason: str):
|
def notify_qg_failure(task_id: int, stage: str, check: str, reason: str):
|
||||||
"""Log and notify QG check failure."""
|
"""Log a QG check failure (log-only).
|
||||||
|
|
||||||
|
QG-pending / QG-failed are NOT pinged as separate messages anymore (they are
|
||||||
|
not actionable for Slava). Real rollbacks/deploy-fails are alerted by their
|
||||||
|
own dedicated send_telegram calls in the engine/launcher.
|
||||||
|
"""
|
||||||
work_item_id = _get_work_item_id(task_id)
|
work_item_id = _get_work_item_id(task_id)
|
||||||
msg = f"\u26a0\ufe0f {work_item_id}: QG {check} \u2014 failed: {reason}"
|
logger.warning(f"\u26a0\ufe0f {work_item_id}: QG {check} \u2014 failed: {reason}")
|
||||||
logger.warning(msg)
|
|
||||||
send_telegram(msg)
|
|
||||||
|
|
||||||
|
|
||||||
def notify_approve_requested(task_id: int):
|
def notify_approve_requested(task_id: int):
|
||||||
"""Notify that analyst requests :approved:."""
|
"""ALERT (separate, notifying): BRD/TZ/AC ready -> flip Plane to Approved.
|
||||||
|
|
||||||
|
Also starts the BRD-review clock and refreshes the tracker so the
|
||||||
|
'⏸️ Ревью БРД · твоё время ⏳' line appears.
|
||||||
|
"""
|
||||||
work_item_id = _get_work_item_id(task_id)
|
work_item_id = _get_work_item_id(task_id)
|
||||||
msg = f"\U0001f4cb {work_item_id}: BRD/\u0422\u0417/AC \u0433\u043e\u0442\u043e\u0432\u044b. \u0416\u0434\u0443 :approved: \u0432 Plane"
|
try:
|
||||||
|
from .db import mark_brd_review_started
|
||||||
|
mark_brd_review_started(task_id)
|
||||||
|
except Exception as e:
|
||||||
|
logger.warning(f"notify_approve_requested: brd clock start failed: {e}")
|
||||||
|
msg = (
|
||||||
|
f"\U0001f4cb {work_item_id}: BRD/\u0422\u0417/AC \u0433\u043e\u0442\u043e\u0432\u044b. "
|
||||||
|
f"\u041f\u0435\u0440\u0435\u0432\u0435\u0434\u0438\u0442\u0435 \u0437\u0430\u0434\u0430\u0447\u0443 \u0432 \u0441\u0442\u0430\u0442\u0443\u0441 Approved "
|
||||||
|
f"\u0432 Plane \u0434\u043b\u044f \u043f\u0440\u043e\u0434\u043e\u043b\u0436\u0435\u043d\u0438\u044f."
|
||||||
|
)
|
||||||
logger.info(msg)
|
logger.info(msg)
|
||||||
send_telegram(msg)
|
update_task_tracker(task_id)
|
||||||
|
send_telegram(msg) # separate, notifying
|
||||||
|
|
||||||
|
|
||||||
def notify_done(task_id: int):
|
def notify_done(task_id: int):
|
||||||
"""Notify task completion."""
|
"""Task completion: refresh the tracker to its final ГОТОВО form (no separate ping)."""
|
||||||
work_item_id = _get_work_item_id(task_id)
|
work_item_id = _get_work_item_id(task_id)
|
||||||
msg = f"\U0001f389 {work_item_id}: \u0437\u0430\u0434\u0430\u0447\u0430 \u0437\u0430\u0432\u0435\u0440\u0448\u0435\u043d\u0430!"
|
logger.info(f"\U0001f389 {work_item_id}: \u0437\u0430\u0434\u0430\u0447\u0430 \u0437\u0430\u0432\u0435\u0440\u0448\u0435\u043d\u0430!")
|
||||||
logger.info(msg)
|
update_task_tracker(task_id)
|
||||||
send_telegram(msg)
|
|
||||||
|
|
||||||
|
|
||||||
def notify_error(task_id: int, error: str):
|
def notify_error(task_id: int, error: str):
|
||||||
"""Log and notify error for a task."""
|
"""ALERT (separate, notifying): task error."""
|
||||||
work_item_id = _get_work_item_id(task_id) if task_id else "system"
|
work_item_id = _get_work_item_id(task_id) if task_id else "system"
|
||||||
msg = f"\U0001f534 {work_item_id}: ERROR \u2014 {error}"
|
msg = f"\U0001f534 {work_item_id}: ERROR \u2014 {error}"
|
||||||
logger.error(msg)
|
logger.error(msg)
|
||||||
send_telegram(msg)
|
send_telegram(msg) # separate, notifying
|
||||||
|
|||||||
@@ -197,6 +197,42 @@ def fetch_issue_description(issue_id: str, project_id: str) -> str:
|
|||||||
return ""
|
return ""
|
||||||
|
|
||||||
|
|
||||||
|
def fetch_issue_fields(issue_id: str, project_id: str) -> tuple[str, str]:
|
||||||
|
"""BUG B: GET the Plane issue by UUID ONCE and return (name, description).
|
||||||
|
|
||||||
|
Plane's ``issue.updated`` webhook (e.g. a status change) only carries the
|
||||||
|
CHANGED fields, so BOTH ``name`` and ``description`` are usually absent in
|
||||||
|
the payload. start_pipeline needs the real title (for the branch slug) and
|
||||||
|
the real description (for the analyst .task.md). To avoid issuing two
|
||||||
|
separate issue-detail GETs (one for name, one for description), this single
|
||||||
|
request returns both.
|
||||||
|
|
||||||
|
Reuses the exact GET issue detail endpoint / shared token already used by
|
||||||
|
``fetch_issue_sequence_id`` / ``fetch_issue_description``. For the
|
||||||
|
description it applies the same logic as ``fetch_issue_description``
|
||||||
|
(prefer ``description_stripped``, fall back to stripping
|
||||||
|
``description_html``).
|
||||||
|
|
||||||
|
Returns ("", "") on network error, non-2xx, or missing body - never raises,
|
||||||
|
so a Plane outage degrades gracefully (caller keeps its payload fallbacks).
|
||||||
|
"""
|
||||||
|
url = f"{PLANE_BASE}/workspaces/{WORKSPACE}/projects/{project_id}/issues/{issue_id}/"
|
||||||
|
try:
|
||||||
|
resp = httpx.get(url, headers=PLANE_HEADERS, timeout=10)
|
||||||
|
resp.raise_for_status()
|
||||||
|
body = resp.json()
|
||||||
|
name = (body.get("name") or "").strip()
|
||||||
|
desc = body.get("description_stripped")
|
||||||
|
if desc and desc.strip():
|
||||||
|
description = desc
|
||||||
|
else:
|
||||||
|
description = _strip_html(body.get("description_html") or "")
|
||||||
|
return name, description
|
||||||
|
except Exception as e:
|
||||||
|
logger.warning(f"fetch_issue_fields failed for {issue_id}: {e}")
|
||||||
|
return "", ""
|
||||||
|
|
||||||
|
|
||||||
def find_issue_id(work_item_id: str, project_id: str = None) -> str | None:
|
def find_issue_id(work_item_id: str, project_id: str = None) -> str | None:
|
||||||
"""Find Plane issue UUID by work_item_id (e.g. 'ET-002')."""
|
"""Find Plane issue UUID by work_item_id (e.g. 'ET-002')."""
|
||||||
project_id = _resolve_project_id(work_item_id, project_id)
|
project_id = _resolve_project_id(work_item_id, project_id)
|
||||||
@@ -307,6 +343,17 @@ def set_issue_blocked(work_item_id: str, project_id: str = None):
|
|||||||
_set_issue_state_direct(work_item_id, PLANE_STATES["blocked"], project_id)
|
_set_issue_state_direct(work_item_id, PLANE_STATES["blocked"], project_id)
|
||||||
|
|
||||||
|
|
||||||
|
def set_issue_done(work_item_id: str, project_id: str = None):
|
||||||
|
"""Observability fix: force the issue into the TERMINAL Done state.
|
||||||
|
|
||||||
|
Used by the deploy->done success path so a completed task always reaches the
|
||||||
|
terminal Plane state (it used to stick on In Progress because the merge
|
||||||
|
webhook bypassed the stage engine). Uses the existing PLANE_STATES['done']
|
||||||
|
UUID — the mapping itself is NOT changed.
|
||||||
|
"""
|
||||||
|
_set_issue_state_direct(work_item_id, PLANE_STATES["done"], project_id)
|
||||||
|
|
||||||
|
|
||||||
def set_issue_in_progress(work_item_id: str, project_id: str = None):
|
def set_issue_in_progress(work_item_id: str, project_id: str = None):
|
||||||
"""Set issue to 'In Progress' state — agent working."""
|
"""Set issue to 'In Progress' state — agent working."""
|
||||||
_set_issue_state_direct(work_item_id, PLANE_STATES["in_progress"], project_id)
|
_set_issue_state_direct(work_item_id, PLANE_STATES["in_progress"], project_id)
|
||||||
|
|||||||
107
src/qg/checks.py
107
src/qg/checks.py
@@ -2,6 +2,7 @@
|
|||||||
|
|
||||||
import os
|
import os
|
||||||
import logging
|
import logging
|
||||||
|
import subprocess
|
||||||
import httpx
|
import httpx
|
||||||
from ..config import settings
|
from ..config import settings
|
||||||
|
|
||||||
@@ -249,9 +250,17 @@ def check_reviewer_verdict(repo: str, work_item_id: str, branch: str | None = No
|
|||||||
|
|
||||||
def check_tests_local(repo: str, branch: str) -> tuple[bool, str]:
|
def check_tests_local(repo: str, branch: str) -> tuple[bool, str]:
|
||||||
"""
|
"""
|
||||||
|
DEPRECATED: replaced by check_ci_green on the development stage (CI is now
|
||||||
|
configured). Kept for backward-compat; not wired to any stage.
|
||||||
|
|
||||||
S-1 fix: run the project test suite locally and judge by exit code, instead of
|
S-1 fix: run the project test suite locally and judge by exit code, instead of
|
||||||
depending on Gitea CI (which is not configured -> always false).
|
depending on Gitea CI (which is not configured -> always false).
|
||||||
|
|
||||||
|
БАГ 5 fix: invoke pytest directly instead of make test. make is not installed
|
||||||
|
in the orchestrator container, so the previous ["make", "test"] call raised
|
||||||
|
FileNotFoundError. This reproduces the Makefile test target 1:1
|
||||||
|
(cd src/api && python -m pytest ../../tests/ -v).
|
||||||
|
|
||||||
ORCH-2 / S-4: tests run inside the per-branch worktree (ensure_worktree), so this
|
ORCH-2 / S-4: tests run inside the per-branch worktree (ensure_worktree), so this
|
||||||
is safe for concurrent active tasks — no shared /repos checkout race.
|
is safe for concurrent active tasks — no shared /repos checkout race.
|
||||||
"""
|
"""
|
||||||
@@ -259,7 +268,8 @@ def check_tests_local(repo: str, branch: str) -> tuple[bool, str]:
|
|||||||
try:
|
try:
|
||||||
repo_path = ensure_worktree(repo, branch)
|
repo_path = ensure_worktree(repo, branch)
|
||||||
r = subprocess.run(
|
r = subprocess.run(
|
||||||
["make", "test"], cwd=repo_path,
|
["python", "-m", "pytest", "../../tests/", "-v"],
|
||||||
|
cwd=os.path.join(repo_path, "src", "api"),
|
||||||
capture_output=True, text=True, timeout=600,
|
capture_output=True, text=True, timeout=600,
|
||||||
)
|
)
|
||||||
if r.returncode == 0:
|
if r.returncode == 0:
|
||||||
@@ -272,6 +282,100 @@ def check_tests_local(repo: str, branch: str) -> tuple[bool, str]:
|
|||||||
return False, f"Local test run error: {e}"
|
return False, f"Local test run error: {e}"
|
||||||
|
|
||||||
|
|
||||||
|
def _parse_deploy_status(content: str) -> tuple[bool, str]:
|
||||||
|
"""Parse a 14-deploy-log.md body and map its `deploy_status:` frontmatter to a
|
||||||
|
quality-gate verdict. Reads ONLY the machine-readable YAML field, never prose.
|
||||||
|
|
||||||
|
deploy_status: SUCCESS -> (True, "Deploy status: SUCCESS")
|
||||||
|
deploy_status: FAILED -> (False, "Deploy status: FAILED")
|
||||||
|
missing field / no frontmatter / bad YAML -> (False, <reason>)
|
||||||
|
"""
|
||||||
|
import yaml
|
||||||
|
status = None
|
||||||
|
if content.startswith("---"):
|
||||||
|
parts = content.split("---", 2)
|
||||||
|
if len(parts) >= 3:
|
||||||
|
try:
|
||||||
|
fm = yaml.safe_load(parts[1]) or {}
|
||||||
|
except yaml.YAMLError as e:
|
||||||
|
return False, f"Invalid YAML frontmatter in deploy log: {e}"
|
||||||
|
status = str(fm.get("deploy_status", "")).upper().strip()
|
||||||
|
if status == "SUCCESS":
|
||||||
|
return True, "Deploy status: SUCCESS"
|
||||||
|
if status == "FAILED":
|
||||||
|
return False, "Deploy status: FAILED"
|
||||||
|
return False, f"No machine-readable deploy_status in frontmatter (got: {status!r})"
|
||||||
|
|
||||||
|
|
||||||
|
def _deploy_log_from_main(repo: str, work_item_id: str) -> str | None:
|
||||||
|
"""Best-effort read of 14-deploy-log.md from origin/main on the shared clone.
|
||||||
|
|
||||||
|
The deployer writes 14-deploy-log.md and merges the deploy artifacts into main
|
||||||
|
via a separate PR (see ET-013), so the file lands in origin/main, NOT in the
|
||||||
|
feature branch worktree the gate normally reads. This recovers it from main.
|
||||||
|
|
||||||
|
Degrades gracefully: any git failure (no clone, network/fetch error, file
|
||||||
|
absent in main) returns None instead of raising, so the caller falls back to
|
||||||
|
the plain "not found" verdict. Never raises.
|
||||||
|
"""
|
||||||
|
repo_clone = os.path.join(settings.repos_dir, repo)
|
||||||
|
if not os.path.isdir(os.path.join(repo_clone, ".git")):
|
||||||
|
return None
|
||||||
|
rel = f"docs/work-items/{work_item_id}/14-deploy-log.md"
|
||||||
|
try:
|
||||||
|
# Refresh origin/main so we see freshly-merged deploy artifacts.
|
||||||
|
subprocess.run(
|
||||||
|
["git", "-C", repo_clone, "fetch", "origin", "main"],
|
||||||
|
check=False, capture_output=True, timeout=30,
|
||||||
|
)
|
||||||
|
show = subprocess.run(
|
||||||
|
["git", "-C", repo_clone, "show", f"origin/main:{rel}"],
|
||||||
|
check=False, capture_output=True, text=True, timeout=15,
|
||||||
|
)
|
||||||
|
except (subprocess.SubprocessError, OSError) as e:
|
||||||
|
logger.warning("deploy-log origin/main lookup failed for %s/%s: %s", repo, work_item_id, e)
|
||||||
|
return None
|
||||||
|
if show.returncode != 0:
|
||||||
|
return None
|
||||||
|
return show.stdout
|
||||||
|
|
||||||
|
|
||||||
|
def check_deploy_status(repo: str, work_item_id: str, branch: str | None = None) -> tuple[bool, str]:
|
||||||
|
"""
|
||||||
|
БАГ 8 fix: gate the deploy -> done transition on the deployer's machine-readable
|
||||||
|
verdict in 14-deploy-log.md frontmatter, NOT on the LLM process exit code
|
||||||
|
(which is always 0 on a successful agent session even when the deploy failed).
|
||||||
|
|
||||||
|
Mirrors check_reviewer_verdict (S-5): reads ONLY `deploy_status:` from YAML
|
||||||
|
frontmatter. Returns:
|
||||||
|
(True, ...) -> deploy_status: SUCCESS
|
||||||
|
(False, ...) -> deploy_status: FAILED, missing field, or no frontmatter
|
||||||
|
|
||||||
|
ET-013 path-sync fix: the deployer writes 14-deploy-log.md and merges the deploy
|
||||||
|
artifacts into main via a SEPARATE PR, so the log lands in origin/main, not in
|
||||||
|
the feature-branch worktree this gate reads via _repo_path(repo, branch). If the
|
||||||
|
file is absent in the worktree we fall back to reading it from origin/main on the
|
||||||
|
shared clone. Lookup order: worktree -> origin/main -> not found.
|
||||||
|
"""
|
||||||
|
repo_path = _repo_path(repo, branch)
|
||||||
|
log_path = os.path.join(repo_path, f"docs/work-items/{work_item_id}/14-deploy-log.md")
|
||||||
|
|
||||||
|
if os.path.isfile(log_path):
|
||||||
|
try:
|
||||||
|
with open(log_path, "r") as f:
|
||||||
|
content = f.read()
|
||||||
|
except OSError as e:
|
||||||
|
return False, f"Error reading deploy log: {e}"
|
||||||
|
return _parse_deploy_status(content)
|
||||||
|
|
||||||
|
# Not in the feature worktree — the deployer may have merged it into main.
|
||||||
|
main_content = _deploy_log_from_main(repo, work_item_id)
|
||||||
|
if main_content is not None:
|
||||||
|
return _parse_deploy_status(main_content)
|
||||||
|
|
||||||
|
return False, "Deploy log not found (14-deploy-log.md)"
|
||||||
|
|
||||||
|
|
||||||
# Registry for dynamic lookup by name
|
# Registry for dynamic lookup by name
|
||||||
QG_CHECKS = {
|
QG_CHECKS = {
|
||||||
"check_analysis_approved": check_analysis_approved,
|
"check_analysis_approved": check_analysis_approved,
|
||||||
@@ -282,4 +386,5 @@ QG_CHECKS = {
|
|||||||
"check_tests_passed": check_tests_passed,
|
"check_tests_passed": check_tests_passed,
|
||||||
"check_reviewer_verdict": check_reviewer_verdict,
|
"check_reviewer_verdict": check_reviewer_verdict,
|
||||||
"check_tests_local": check_tests_local,
|
"check_tests_local": check_tests_local,
|
||||||
|
"check_deploy_status": check_deploy_status,
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -47,6 +47,7 @@ from .plane_sync import (
|
|||||||
set_issue_needs_input,
|
set_issue_needs_input,
|
||||||
set_issue_in_progress,
|
set_issue_in_progress,
|
||||||
set_issue_blocked,
|
set_issue_blocked,
|
||||||
|
set_issue_done,
|
||||||
)
|
)
|
||||||
from .config import settings
|
from .config import settings
|
||||||
|
|
||||||
@@ -189,36 +190,48 @@ def advance_stage(
|
|||||||
|
|
||||||
# --- Quality gate ----------------------------------------------------
|
# --- Quality gate ----------------------------------------------------
|
||||||
if qg_name and qg_name in QG_CHECKS:
|
if qg_name and qg_name in QG_CHECKS:
|
||||||
# Human-approval gate: special analyst approved-flow (launcher only).
|
# Human-approval gate: split by path.
|
||||||
if qg_name == "check_analysis_approved":
|
if qg_name == "check_analysis_approved":
|
||||||
_handle_analysis_approved_flow(
|
# Launcher path (analyst just finished): set In Review + ask for
|
||||||
task_id, current_stage, repo, work_item_id, branch, agent, result
|
# the Approved status. This gate never advances on its own -- a
|
||||||
)
|
# human Approved verdict does that.
|
||||||
return result
|
if agent == "analyst":
|
||||||
|
_handle_analysis_approved_flow(
|
||||||
|
task_id, current_stage, repo, work_item_id, branch, agent, result
|
||||||
|
)
|
||||||
|
return result
|
||||||
|
# Webhook Approved-verdict path (agent is None): the human flipped
|
||||||
|
# the Plane status to Approved, which IS the approval. The gate is
|
||||||
|
# satisfied -- do NOT re-run check_analysis_approved (it looks for
|
||||||
|
# an :approved: *comment* and would block on a status-only
|
||||||
|
# approval). Mark it passed and fall through to the Advance block.
|
||||||
|
result.qg_name = qg_name
|
||||||
|
result.qg_passed = True
|
||||||
|
result.qg_reason = "approved-via-status"
|
||||||
|
else:
|
||||||
|
passed, reason = _run_qg(qg_name, repo, work_item_id, branch)
|
||||||
|
result.qg_passed = passed
|
||||||
|
result.qg_reason = reason
|
||||||
|
|
||||||
passed, reason = _run_qg(qg_name, repo, work_item_id, branch)
|
if not passed:
|
||||||
result.qg_passed = passed
|
logger.info(
|
||||||
result.qg_reason = reason
|
f"Task {task_id}: QG '{qg_name}' not passed after {agent}: {reason}"
|
||||||
|
)
|
||||||
|
# Behaviour parity:
|
||||||
|
# - webhook path (finished_agent is None): emit the generic
|
||||||
|
# QG-failure notification, exactly like the old plane handler.
|
||||||
|
# - launcher path (finished_agent set): NO generic notification;
|
||||||
|
# the rollback branches below own their own messaging, exactly
|
||||||
|
# like the old launcher handler.
|
||||||
|
if agent is None:
|
||||||
|
notify_qg_failure(task_id, current_stage, qg_name, reason)
|
||||||
|
plane_notify_qg(work_item_id, current_stage, qg_name, reason)
|
||||||
|
|
||||||
if not passed:
|
_handle_qg_failure_rollbacks(
|
||||||
logger.info(
|
task_id, current_stage, repo, work_item_id, branch,
|
||||||
f"Task {task_id}: QG '{qg_name}' not passed after {agent}: {reason}"
|
agent, qg_name, reason, result,
|
||||||
)
|
)
|
||||||
# Behaviour parity:
|
return result
|
||||||
# - webhook path (finished_agent is None): emit the generic
|
|
||||||
# QG-failure notification, exactly like the old plane handler.
|
|
||||||
# - launcher path (finished_agent set): NO generic notification;
|
|
||||||
# the rollback branches below own their own messaging, exactly
|
|
||||||
# like the old launcher handler.
|
|
||||||
if agent is None:
|
|
||||||
notify_qg_failure(task_id, current_stage, qg_name, reason)
|
|
||||||
plane_notify_qg(work_item_id, current_stage, qg_name, reason)
|
|
||||||
|
|
||||||
_handle_qg_failure_rollbacks(
|
|
||||||
task_id, current_stage, repo, work_item_id, branch,
|
|
||||||
agent, qg_name, reason, result,
|
|
||||||
)
|
|
||||||
return result
|
|
||||||
|
|
||||||
elif qg_name:
|
elif qg_name:
|
||||||
# QG name set but not registered — do not advance (launcher behavior).
|
# QG name set but not registered — do not advance (launcher behavior).
|
||||||
@@ -227,6 +240,15 @@ def advance_stage(
|
|||||||
|
|
||||||
# --- Advance ---------------------------------------------------------
|
# --- Advance ---------------------------------------------------------
|
||||||
update_task_stage(task_id, next_stage)
|
update_task_stage(task_id, next_stage)
|
||||||
|
# Telegram live tracker: the analysis->architecture advance is the human
|
||||||
|
# Approved gate clearing -> stamp the END of "Ревью БРД" (the only
|
||||||
|
# human time). Idempotent: only the first stamp counts.
|
||||||
|
if current_stage == "analysis" and next_stage == "architecture":
|
||||||
|
try:
|
||||||
|
from .db import mark_brd_review_ended
|
||||||
|
mark_brd_review_ended(task_id)
|
||||||
|
except Exception as e:
|
||||||
|
logger.warning(f"Task {task_id}: brd review end stamp failed: {e}")
|
||||||
notify_stage_change(task_id, current_stage, next_stage)
|
notify_stage_change(task_id, current_stage, next_stage)
|
||||||
plane_notify_stage(work_item_id, current_stage, next_stage)
|
plane_notify_stage(work_item_id, current_stage, next_stage)
|
||||||
result.advanced = True
|
result.advanced = True
|
||||||
@@ -235,6 +257,22 @@ def advance_stage(
|
|||||||
f"(auto-advance after {agent})"
|
f"(auto-advance after {agent})"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
# --- Terminal sync: deploy -> done must reach Plane's Done -----------
|
||||||
|
# When the deployer's check_deploy_status passes we advance to the
|
||||||
|
# terminal 'done' stage. Previously a merged-PR webhook completed the
|
||||||
|
# task out-of-band and Plane stuck on In Progress. Now done flows through
|
||||||
|
# here, so explicitly drive the Plane issue into the terminal Done state
|
||||||
|
# (PLANE_STATES['done'] — mapping unchanged) in addition to the
|
||||||
|
# stage-change comment above.
|
||||||
|
if next_stage == "done" and work_item_id:
|
||||||
|
try:
|
||||||
|
set_issue_done(work_item_id)
|
||||||
|
logger.info(
|
||||||
|
f"Task {task_id}: deploy->done, Plane state forced to Done"
|
||||||
|
)
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Task {task_id}: failed to set Plane Done: {e}")
|
||||||
|
|
||||||
# --- Launch the next agent (ORCH-4 fix: current_stage, not next) -----
|
# --- Launch the next agent (ORCH-4 fix: current_stage, not next) -----
|
||||||
next_agent = get_agent_for_stage(current_stage)
|
next_agent = get_agent_for_stage(current_stage)
|
||||||
if next_agent:
|
if next_agent:
|
||||||
@@ -257,6 +295,58 @@ def advance_stage(
|
|||||||
return result
|
return result
|
||||||
|
|
||||||
|
|
||||||
|
def _build_analyst_ready_comment(repo: str, work_item_id: str, branch: str) -> str:
|
||||||
|
"""BUG C: HTML comment posted when analyst artifacts are ready.
|
||||||
|
|
||||||
|
Status-only model (PR #12): approval is the **Approved** status, NOT a
|
||||||
|
``:approved:`` comment and NOT moving back to In Progress. The comment asks
|
||||||
|
the stakeholder to flip the status and links the documents the analyst
|
||||||
|
actually produced.
|
||||||
|
|
||||||
|
Links point at the Gitea web view:
|
||||||
|
{gitea_url}/{owner}/{repo}/src/branch/{branch}/docs/work-items/{wid}/<file>
|
||||||
|
Only files that REALLY exist in the worktree are listed (no invented docs).
|
||||||
|
"""
|
||||||
|
text = (
|
||||||
|
"\u2705 BRD/\u0422\u0417/AC \u0433\u043e\u0442\u043e\u0432\u044b. "
|
||||||
|
"\u0414\u043b\u044f \u043f\u0440\u043e\u0434\u0432\u0438\u0436\u0435\u043d\u0438\u044f "
|
||||||
|
"\u043f\u0435\u0440\u0435\u0432\u0435\u0434\u0438\u0442\u0435 \u0437\u0430\u0434\u0430\u0447\u0443 "
|
||||||
|
"\u0432 \u0441\u0442\u0430\u0442\u0443\u0441 Approved. "
|
||||||
|
"\u0414\u043b\u044f \u043e\u0442\u043a\u043b\u043e\u043d\u0435\u043d\u0438\u044f \u2014 "
|
||||||
|
"\u043d\u0430\u043f\u0438\u0448\u0438\u0442\u0435 \u043f\u0440\u0438\u0447\u0438\u043d\u0443 "
|
||||||
|
"\u043a\u043e\u043c\u043c\u0435\u043d\u0442\u043e\u043c \u0438 \u043f\u0435\u0440\u0435\u0432\u0435\u0434\u0438\u0442\u0435 "
|
||||||
|
"\u0432 Rejected."
|
||||||
|
)
|
||||||
|
|
||||||
|
# Candidate analyst artifacts (label -> filename). Only existing ones linked.
|
||||||
|
candidates = [
|
||||||
|
("Business request", "00-business-request.md"),
|
||||||
|
("BRD", "01-brd.md"),
|
||||||
|
("\u0422\u0417 (TRZ)", "02-trz.md"),
|
||||||
|
("Acceptance Criteria", "03-acceptance-criteria.md"),
|
||||||
|
("Test Plan", "04-test-plan.yaml"),
|
||||||
|
("UI Test Cases", "04b-ui-test-cases.md"),
|
||||||
|
]
|
||||||
|
rel_dir = f"docs/work-items/{work_item_id}"
|
||||||
|
try:
|
||||||
|
wt_dir = os.path.join(get_worktree_path(repo, branch), rel_dir)
|
||||||
|
except Exception:
|
||||||
|
wt_dir = None
|
||||||
|
|
||||||
|
owner = getattr(settings, "gitea_owner", "admin")
|
||||||
|
base = (getattr(settings, "gitea_public_url", "") or settings.gitea_url).rstrip("/")
|
||||||
|
links = []
|
||||||
|
for label, fname in candidates:
|
||||||
|
if wt_dir and not os.path.isfile(os.path.join(wt_dir, fname)):
|
||||||
|
continue
|
||||||
|
href = f"{base}/{owner}/{repo}/src/branch/{branch}/{rel_dir}/{fname}"
|
||||||
|
links.append(f'<li><a href="{href}">{label}</a></li>')
|
||||||
|
|
||||||
|
if links:
|
||||||
|
text += "<br><b>\u0414\u043e\u043a\u0443\u043c\u0435\u043d\u0442\u044b:</b><ul>" + "".join(links) + "</ul>"
|
||||||
|
return text
|
||||||
|
|
||||||
|
|
||||||
def _handle_analysis_approved_flow(
|
def _handle_analysis_approved_flow(
|
||||||
task_id, current_stage, repo, work_item_id, branch, agent, result: AdvanceResult
|
task_id, current_stage, repo, work_item_id, branch, agent, result: AdvanceResult
|
||||||
):
|
):
|
||||||
@@ -279,19 +369,17 @@ def _handle_analysis_approved_flow(
|
|||||||
|
|
||||||
files_ok, _ = files_check(repo, work_item_id, branch)
|
files_ok, _ = files_check(repo, work_item_id, branch)
|
||||||
if files_ok:
|
if files_ok:
|
||||||
# Full artifacts ready -> In Review, ask for :approved:.
|
# Full artifacts ready -> In Review, ask for the Approved STATUS (BUG C).
|
||||||
set_issue_in_review(work_item_id)
|
set_issue_in_review(work_item_id)
|
||||||
plane_add_comment(
|
plane_add_comment(
|
||||||
work_item_id,
|
work_item_id,
|
||||||
"\U0001f4cb BRD/\u0422\u0417/AC/TestPlan \u0433\u043e\u0442\u043e\u0432\u044b. "
|
_build_analyst_ready_comment(repo, work_item_id, branch),
|
||||||
"\u041f\u0440\u043e\u0448\u0443 review \u0438 \u0440\u0435\u0430\u043a\u0446\u0438\u044e :approved: "
|
|
||||||
"\u0434\u043b\u044f \u043f\u0440\u043e\u0434\u0432\u0438\u0436\u0435\u043d\u0438\u044f \u0432 Architecture.",
|
|
||||||
author="analyst",
|
author="analyst",
|
||||||
)
|
)
|
||||||
notify_approve_requested(task_id)
|
notify_approve_requested(task_id)
|
||||||
result.note = "analysis-in-review"
|
result.note = "analysis-in-review"
|
||||||
logger.info(
|
logger.info(
|
||||||
f"Task {task_id}: analyst finished, requested :approved: in Plane"
|
f"Task {task_id}: analyst finished, requested Approved status in Plane"
|
||||||
)
|
)
|
||||||
return
|
return
|
||||||
|
|
||||||
@@ -428,3 +516,31 @@ def _handle_qg_failure_rollbacks(
|
|||||||
f"Task {task_id}: architect conflict, enqueued analyst "
|
f"Task {task_id}: architect conflict, enqueued analyst "
|
||||||
f"(job_id={new_job})"
|
f"(job_id={new_job})"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
# БАГ 8: deployer verdict FAILED -> roll deploy back to development.
|
||||||
|
# The launcher's exit_code-based guard (launcher.py:475) never fires because
|
||||||
|
# the LLM process exit code is always 0; this gate fires on the machine-readable
|
||||||
|
# deploy_status verdict in 14-deploy-log.md instead. Mirrors the launcher block
|
||||||
|
# (rollback + set_issue_blocked + notify) but is driven by the VERDICT.
|
||||||
|
if agent == "deployer" and qg_name == "check_deploy_status":
|
||||||
|
update_task_stage(task_id, "development")
|
||||||
|
notify_stage_change(task_id, current_stage, "development")
|
||||||
|
plane_notify_stage(work_item_id, current_stage, "development")
|
||||||
|
result.rolled_back_to = "development"
|
||||||
|
set_issue_blocked(work_item_id)
|
||||||
|
notify_qg_failure(task_id, "deploy", "check_deploy_status", reason)
|
||||||
|
plane_add_comment(
|
||||||
|
work_item_id,
|
||||||
|
f"\u274c Deploy FAILED ({reason}). Rolled back to development. "
|
||||||
|
f"Developer \u043d\u0443\u0436\u0435\u043d \u0434\u043b\u044f \u0444\u0438\u043a\u0441\u0430.",
|
||||||
|
author="deployer",
|
||||||
|
)
|
||||||
|
send_telegram(
|
||||||
|
f"\U0001f6a8 {work_item_id}: Deploy FAILED ({reason}). "
|
||||||
|
f"Rolled back to development. Needs fix."
|
||||||
|
)
|
||||||
|
result.alerted = True
|
||||||
|
logger.error(
|
||||||
|
f"Task {task_id}: deployer verdict FAILED, rolled back deploy -> "
|
||||||
|
f"development ({reason})"
|
||||||
|
)
|
||||||
|
|||||||
@@ -13,10 +13,10 @@ STAGE_TRANSITIONS = {
|
|||||||
"created": {"next": "analysis", "agent": "analyst", "qg": None},
|
"created": {"next": "analysis", "agent": "analyst", "qg": None},
|
||||||
"analysis": {"next": "architecture", "agent": "architect", "qg": "check_analysis_approved"},
|
"analysis": {"next": "architecture", "agent": "architect", "qg": "check_analysis_approved"},
|
||||||
"architecture": {"next": "development", "agent": "developer", "qg": "check_architecture_done"},
|
"architecture": {"next": "development", "agent": "developer", "qg": "check_architecture_done"},
|
||||||
"development": {"next": "review", "agent": "reviewer", "qg": "check_tests_local"},
|
"development": {"next": "review", "agent": "reviewer", "qg": "check_ci_green"},
|
||||||
"review": {"next": "testing", "agent": "tester", "qg": "check_reviewer_verdict"},
|
"review": {"next": "testing", "agent": "tester", "qg": "check_reviewer_verdict"},
|
||||||
"testing": {"next": "deploy", "agent": "deployer", "qg": "check_tests_passed"},
|
"testing": {"next": "deploy", "agent": "deployer", "qg": "check_tests_passed"},
|
||||||
"deploy": {"next": "done", "agent": None, "qg": None},
|
"deploy": {"next": "done", "agent": None, "qg": "check_deploy_status"},
|
||||||
"done": {"next": None, "agent": None, "qg": None},
|
"done": {"next": None, "agent": None, "qg": None},
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
224
src/usage.py
224
src/usage.py
@@ -31,7 +31,8 @@ def parse_usage_from_text(text: str) -> dict | None:
|
|||||||
top-level '{' ... '}' that parses and carries usage/total_cost_usd.
|
top-level '{' ... '}' that parses and carries usage/total_cost_usd.
|
||||||
|
|
||||||
Returns a normalised dict
|
Returns a normalised dict
|
||||||
{input_tokens, output_tokens, cache_read_tokens, cost_usd}
|
{input_tokens, output_tokens, cache_read_tokens, cache_creation_tokens,
|
||||||
|
cost_usd}
|
||||||
(ints / float, missing fields -> 0 / 0.0), or None if no usable JSON found.
|
(ints / float, missing fields -> 0 / 0.0), or None if no usable JSON found.
|
||||||
"""
|
"""
|
||||||
if not text:
|
if not text:
|
||||||
@@ -71,10 +72,67 @@ def parse_usage_from_text(text: str) -> dict | None:
|
|||||||
"cache_read_tokens": _int(
|
"cache_read_tokens": _int(
|
||||||
usage.get("cache_read_input_tokens", usage.get("cache_read_tokens"))
|
usage.get("cache_read_input_tokens", usage.get("cache_read_tokens"))
|
||||||
),
|
),
|
||||||
|
# The cache-CREATION slice (writing new cache entries) is part of the
|
||||||
|
# REAL input and used to be dropped on the floor. Persist it so the
|
||||||
|
# "X in" figure reflects the full prompt size, not just fresh tokens.
|
||||||
|
"cache_creation_tokens": _int(
|
||||||
|
usage.get("cache_creation_input_tokens", usage.get("cache_creation_tokens"))
|
||||||
|
),
|
||||||
"cost_usd": _float(cost),
|
"cost_usd": _float(cost),
|
||||||
|
# Telegram live tracker: the model the run actually used. claude
|
||||||
|
# --output-format json reports it under modelUsage (a dict keyed by the
|
||||||
|
# full model id) and/or a top-level "model" field. We keep the FULL name
|
||||||
|
# here; short_model_name() trims it for the tracker. None when unknown.
|
||||||
|
"model": _extract_model(candidate),
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def _extract_model(candidate: dict) -> str | None:
|
||||||
|
"""Best-effort: pull the model id out of a claude result JSON object.
|
||||||
|
|
||||||
|
Prefers modelUsage (a dict keyed by full model ids, e.g.
|
||||||
|
{"claude-opus-4-8": {...}}) and returns the key with the most output
|
||||||
|
tokens; falls back to a top-level "model" string. Never raises -> None.
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
mu = candidate.get("modelUsage")
|
||||||
|
if isinstance(mu, dict) and mu:
|
||||||
|
def _out(v):
|
||||||
|
try:
|
||||||
|
return int((v or {}).get("outputTokens", 0))
|
||||||
|
except (TypeError, ValueError, AttributeError):
|
||||||
|
return 0
|
||||||
|
best = max(mu.items(), key=lambda kv: _out(kv[1]))
|
||||||
|
if best and best[0]:
|
||||||
|
return str(best[0])
|
||||||
|
model = candidate.get("model")
|
||||||
|
if isinstance(model, str) and model:
|
||||||
|
return model
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
def short_model_name(full: str | None) -> str:
|
||||||
|
"""Trim a full model id to a short tag for the tracker.
|
||||||
|
|
||||||
|
'tokenator/claude-opus-4-8' -> 'opus-4-8'
|
||||||
|
'vibecode/claude-sonnet-4.6' -> 'sonnet-4.6'
|
||||||
|
'claude-opus-4-8' -> 'opus-4-8'
|
||||||
|
Returns '' when full is falsy so callers can omit the ' · <model>' suffix.
|
||||||
|
"""
|
||||||
|
if not full:
|
||||||
|
return ""
|
||||||
|
name = str(full).strip()
|
||||||
|
# Drop any provider prefix up to and including the last '/'.
|
||||||
|
if "/" in name:
|
||||||
|
name = name.rsplit("/", 1)[-1]
|
||||||
|
# Drop a leading 'claude-' marketing prefix.
|
||||||
|
if name.startswith("claude-"):
|
||||||
|
name = name[len("claude-"):]
|
||||||
|
return name
|
||||||
|
|
||||||
|
|
||||||
def _extract_last_json_object(text: str) -> dict | None:
|
def _extract_last_json_object(text: str) -> dict | None:
|
||||||
"""Return the last balanced top-level JSON object in `text` that parses.
|
"""Return the last balanced top-level JSON object in `text` that parses.
|
||||||
|
|
||||||
@@ -150,12 +208,15 @@ def record_usage(run_id: int, usage: dict | None):
|
|||||||
try:
|
try:
|
||||||
conn.execute(
|
conn.execute(
|
||||||
"UPDATE agent_runs SET input_tokens=?, output_tokens=?, "
|
"UPDATE agent_runs SET input_tokens=?, output_tokens=?, "
|
||||||
"cache_read_tokens=?, cost_usd=? WHERE id=?",
|
"cache_read_tokens=?, cache_creation_tokens=?, cost_usd=?, "
|
||||||
|
"model=COALESCE(?, model) WHERE id=?",
|
||||||
(
|
(
|
||||||
usage.get("input_tokens"),
|
usage.get("input_tokens"),
|
||||||
usage.get("output_tokens"),
|
usage.get("output_tokens"),
|
||||||
usage.get("cache_read_tokens"),
|
usage.get("cache_read_tokens"),
|
||||||
|
usage.get("cache_creation_tokens"),
|
||||||
usage.get("cost_usd"),
|
usage.get("cost_usd"),
|
||||||
|
usage.get("model"),
|
||||||
run_id,
|
run_id,
|
||||||
),
|
),
|
||||||
)
|
)
|
||||||
@@ -197,19 +258,132 @@ AGENT_DISPLAY = {
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
def usage_comment(agent: str, usage: dict | None) -> str:
|
def _input_total(usage: dict) -> int:
|
||||||
|
"""FULL input = fresh input + cache-read + cache-creation tokens."""
|
||||||
|
def _i(k):
|
||||||
|
try:
|
||||||
|
return int(usage.get(k) or 0)
|
||||||
|
except (TypeError, ValueError):
|
||||||
|
return 0
|
||||||
|
return _i("input_tokens") + _i("cache_read_tokens") + _i("cache_creation_tokens")
|
||||||
|
|
||||||
|
|
||||||
|
def _cached_total(usage: dict) -> int:
|
||||||
|
"""Cached portion of the input = cache-read + cache-creation tokens."""
|
||||||
|
def _i(k):
|
||||||
|
try:
|
||||||
|
return int(usage.get(k) or 0)
|
||||||
|
except (TypeError, ValueError):
|
||||||
|
return 0
|
||||||
|
return _i("cache_read_tokens") + _i("cache_creation_tokens")
|
||||||
|
|
||||||
|
|
||||||
|
def fmt_in(usage: dict) -> str:
|
||||||
|
"""Render the input figure as full total with a cached breakdown.
|
||||||
|
|
||||||
|
'8.5M in (8.4M cached)' when there is a cache; '45.2k in' when cached==0.
|
||||||
|
"""
|
||||||
|
total = _input_total(usage)
|
||||||
|
cached = _cached_total(usage)
|
||||||
|
if cached > 0:
|
||||||
|
return f"{fmt_tokens(total)} in ({fmt_tokens(cached)} cached)"
|
||||||
|
return f"{fmt_tokens(total)} in"
|
||||||
|
|
||||||
|
|
||||||
|
def usage_comment(
|
||||||
|
agent: str,
|
||||||
|
usage: dict | None,
|
||||||
|
repo: str | None = None,
|
||||||
|
branch: str | None = None,
|
||||||
|
work_item_id: str | None = None,
|
||||||
|
pr_number=None,
|
||||||
|
) -> str:
|
||||||
"""Build the per-agent finish comment, e.g.
|
"""Build the per-agent finish comment, e.g.
|
||||||
'\U0001f4bb Developer \u0433\u043e\u0442\u043e\u0432 \u00b7 45.2k in / 12.1k out \u00b7 $0.21'.
|
'\U0001f4bb Developer \u0433\u043e\u0442\u043e\u0432 \u00b7 8.5M in (8.4M cached) / 45.8k out \u00b7 $7.29'.
|
||||||
|
|
||||||
|
When repo/branch/work_item_id are supplied, the agent's artifact link(s) are
|
||||||
|
appended (BUG: only analyst used to link its docs). Missing artifacts are
|
||||||
|
silently skipped — link building never raises.
|
||||||
"""
|
"""
|
||||||
usage = usage or {}
|
usage = usage or {}
|
||||||
name = AGENT_DISPLAY.get(agent, agent.capitalize())
|
name = AGENT_DISPLAY.get(agent, agent.capitalize())
|
||||||
icon = AGENT_ICON.get(agent, "\u2705")
|
icon = AGENT_ICON.get(agent, "\u2705")
|
||||||
return (
|
line = (
|
||||||
f"{icon} {name} \u0433\u043e\u0442\u043e\u0432 \u00b7 "
|
f"{icon} {name} \u0433\u043e\u0442\u043e\u0432 \u00b7 "
|
||||||
f"{fmt_tokens(usage.get('input_tokens'))} in / "
|
f"{fmt_in(usage)} / "
|
||||||
f"{fmt_tokens(usage.get('output_tokens'))} out \u00b7 "
|
f"{fmt_tokens(usage.get('output_tokens'))} out \u00b7 "
|
||||||
f"{fmt_cost(usage.get('cost_usd'))}"
|
f"{fmt_cost(usage.get('cost_usd'))}"
|
||||||
)
|
)
|
||||||
|
links = artifact_links(agent, repo, branch, work_item_id, pr_number)
|
||||||
|
if links:
|
||||||
|
line += "\n" + "\n".join(links)
|
||||||
|
return line
|
||||||
|
|
||||||
|
|
||||||
|
# Per-agent artifact file under docs/work-items/{wid}/ (architect/developer use
|
||||||
|
# special handling for ADR dirs / PR links, see artifact_links()).
|
||||||
|
AGENT_ARTIFACT = {
|
||||||
|
"reviewer": ("Review", "12-review.md"),
|
||||||
|
"tester": ("Test report", "13-test-report.md"),
|
||||||
|
"deployer": ("Deploy log", "14-deploy-log.md"),
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def artifact_links(
|
||||||
|
agent: str,
|
||||||
|
repo: str | None,
|
||||||
|
branch: str | None,
|
||||||
|
work_item_id: str | None,
|
||||||
|
pr_number=None,
|
||||||
|
) -> list[str]:
|
||||||
|
"""Markdown link(s) to the finishing agent's artifact(s) in Gitea.
|
||||||
|
|
||||||
|
Uses gitea_public_url (falls back to gitea_url) for clickable links, mirroring
|
||||||
|
the analyst doc links. Returns [] (never raises) when there is nothing to
|
||||||
|
link or the required context is missing. analyst is intentionally NOT handled
|
||||||
|
here — its richer doc list lives in stage_engine._build_analyst_ready_comment.
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
from .config import settings
|
||||||
|
owner = getattr(settings, "gitea_owner", "admin")
|
||||||
|
base = (
|
||||||
|
getattr(settings, "gitea_public_url", "") or getattr(settings, "gitea_url", "")
|
||||||
|
).rstrip("/")
|
||||||
|
if not base or not repo:
|
||||||
|
return []
|
||||||
|
links: list[str] = []
|
||||||
|
|
||||||
|
if agent == "developer":
|
||||||
|
if branch:
|
||||||
|
links.append(
|
||||||
|
f"\U0001f4c2 [Branch {branch}]({base}/{owner}/{repo}/src/branch/{branch})"
|
||||||
|
)
|
||||||
|
if pr_number:
|
||||||
|
links.append(
|
||||||
|
f"\U0001f517 [PR #{pr_number}]({base}/{owner}/{repo}/pulls/{pr_number})"
|
||||||
|
)
|
||||||
|
return links
|
||||||
|
|
||||||
|
if agent == "architect":
|
||||||
|
if branch and work_item_id:
|
||||||
|
adr_dir = (
|
||||||
|
f"{base}/{owner}/{repo}/src/branch/{branch}/"
|
||||||
|
f"docs/work-items/{work_item_id}/06-adr"
|
||||||
|
)
|
||||||
|
links.append(f"\U0001f4d0 [ADR]({adr_dir})")
|
||||||
|
return links
|
||||||
|
|
||||||
|
spec = AGENT_ARTIFACT.get(agent)
|
||||||
|
if spec and branch and work_item_id:
|
||||||
|
label, fname = spec
|
||||||
|
href = (
|
||||||
|
f"{base}/{owner}/{repo}/src/branch/{branch}/"
|
||||||
|
f"docs/work-items/{work_item_id}/{fname}"
|
||||||
|
)
|
||||||
|
links.append(f"\U0001f4c4 [{label}]({href})")
|
||||||
|
return links
|
||||||
|
except Exception:
|
||||||
|
return []
|
||||||
|
|
||||||
|
|
||||||
AGENT_ICON = {
|
AGENT_ICON = {
|
||||||
@@ -225,13 +399,22 @@ AGENT_ICON = {
|
|||||||
def task_usage_summary(task_id: int) -> dict:
|
def task_usage_summary(task_id: int) -> dict:
|
||||||
"""Aggregate agent_runs usage for a task.
|
"""Aggregate agent_runs usage for a task.
|
||||||
|
|
||||||
Returns {total_in, total_out, total_cost, per_agent: [(agent, in, out, cost), ...]}.
|
total_in counts the FULL input (input + cache_read + cache_creation), and
|
||||||
|
total_cached counts the cached portion (cache_read + cache_creation).
|
||||||
|
COALESCE(...,0) keeps pre-existing rows (NULL cache_creation) from breaking.
|
||||||
|
|
||||||
|
Returns {total_in, total_cached, total_out, total_cost,
|
||||||
|
per_agent: [(agent, in, cached, out, cost), ...]}.
|
||||||
"""
|
"""
|
||||||
conn = get_db()
|
conn = get_db()
|
||||||
try:
|
try:
|
||||||
rows = conn.execute(
|
rows = conn.execute(
|
||||||
"SELECT agent, "
|
"SELECT agent, "
|
||||||
"COALESCE(SUM(input_tokens),0), "
|
"COALESCE(SUM(input_tokens),0) "
|
||||||
|
" + COALESCE(SUM(cache_read_tokens),0) "
|
||||||
|
" + COALESCE(SUM(cache_creation_tokens),0), "
|
||||||
|
"COALESCE(SUM(cache_read_tokens),0) "
|
||||||
|
" + COALESCE(SUM(cache_creation_tokens),0), "
|
||||||
"COALESCE(SUM(output_tokens),0), "
|
"COALESCE(SUM(output_tokens),0), "
|
||||||
"COALESCE(SUM(cost_usd),0.0) "
|
"COALESCE(SUM(cost_usd),0.0) "
|
||||||
"FROM agent_runs WHERE task_id=? GROUP BY agent ORDER BY agent",
|
"FROM agent_runs WHERE task_id=? GROUP BY agent ORDER BY agent",
|
||||||
@@ -239,12 +422,14 @@ def task_usage_summary(task_id: int) -> dict:
|
|||||||
).fetchall()
|
).fetchall()
|
||||||
finally:
|
finally:
|
||||||
conn.close()
|
conn.close()
|
||||||
per_agent = [(r[0], int(r[1]), int(r[2]), float(r[3])) for r in rows]
|
per_agent = [(r[0], int(r[1]), int(r[2]), int(r[3]), float(r[4])) for r in rows]
|
||||||
total_in = sum(r[1] for r in per_agent)
|
total_in = sum(r[1] for r in per_agent)
|
||||||
total_out = sum(r[2] for r in per_agent)
|
total_cached = sum(r[2] for r in per_agent)
|
||||||
total_cost = sum(r[3] for r in per_agent)
|
total_out = sum(r[3] for r in per_agent)
|
||||||
|
total_cost = sum(r[4] for r in per_agent)
|
||||||
return {
|
return {
|
||||||
"total_in": total_in,
|
"total_in": total_in,
|
||||||
|
"total_cached": total_cached,
|
||||||
"total_out": total_out,
|
"total_out": total_out,
|
||||||
"total_cost": total_cost,
|
"total_cost": total_cost,
|
||||||
"per_agent": per_agent,
|
"per_agent": per_agent,
|
||||||
@@ -254,15 +439,26 @@ def task_usage_summary(task_id: int) -> dict:
|
|||||||
def task_summary_comment(task_id: int) -> str:
|
def task_summary_comment(task_id: int) -> str:
|
||||||
"""Build the Deployer end-of-task summary comment (Feature 4, variant B)."""
|
"""Build the Deployer end-of-task summary comment (Feature 4, variant B)."""
|
||||||
s = task_usage_summary(task_id)
|
s = task_usage_summary(task_id)
|
||||||
|
cached = s.get("total_cached", 0)
|
||||||
|
head_in = (
|
||||||
|
f"{fmt_tokens(s['total_in'])} \u0432\u0445\u043e\u0434 ({fmt_tokens(cached)} cached)"
|
||||||
|
if cached > 0
|
||||||
|
else f"{fmt_tokens(s['total_in'])} \u0432\u0445\u043e\u0434"
|
||||||
|
)
|
||||||
lines = [
|
lines = [
|
||||||
f"\U0001f4ca \u0418\u0442\u043e\u0433\u043e \u043f\u043e \u0437\u0430\u0434\u0430\u0447\u0435: "
|
f"\U0001f4ca \u0418\u0442\u043e\u0433\u043e \u043f\u043e \u0437\u0430\u0434\u0430\u0447\u0435: "
|
||||||
f"{fmt_tokens(s['total_in'])} \u0442\u043e\u043a\u0435\u043d\u043e\u0432 \u0432\u0445\u043e\u0434 / "
|
f"{head_in} / "
|
||||||
f"{fmt_tokens(s['total_out'])} \u0432\u044b\u0445\u043e\u0434 \u00b7 "
|
f"{fmt_tokens(s['total_out'])} \u0432\u044b\u0445\u043e\u0434 \u00b7 "
|
||||||
f"{fmt_cost(s['total_cost'])}"
|
f"{fmt_cost(s['total_cost'])}"
|
||||||
]
|
]
|
||||||
for agent, ti, to, cost in s["per_agent"]:
|
for agent, ti, tc, to, cost in s["per_agent"]:
|
||||||
name = AGENT_DISPLAY.get(agent, agent.capitalize())
|
name = AGENT_DISPLAY.get(agent, agent.capitalize())
|
||||||
|
in_str = (
|
||||||
|
f"{fmt_tokens(ti)} in ({fmt_tokens(tc)} cached)"
|
||||||
|
if tc > 0
|
||||||
|
else f"{fmt_tokens(ti)} in"
|
||||||
|
)
|
||||||
lines.append(
|
lines.append(
|
||||||
f"\u2022 {name}: {fmt_tokens(ti)} in / {fmt_tokens(to)} out \u00b7 {fmt_cost(cost)}"
|
f"\u2022 {name}: {in_str} / {fmt_tokens(to)} out \u00b7 {fmt_cost(cost)}"
|
||||||
)
|
)
|
||||||
return "\n".join(lines)
|
return "\n".join(lines)
|
||||||
|
|||||||
@@ -216,12 +216,31 @@ async def handle_ci_status(payload: dict):
|
|||||||
else:
|
else:
|
||||||
notify_qg_failure(task_id, current_stage, "check_ci_green", reason)
|
notify_qg_failure(task_id, current_stage, "check_ci_green", reason)
|
||||||
|
|
||||||
elif state == "failure":
|
elif state == "failure" and current_stage == "development":
|
||||||
# S-1: Gitea CI is NOT the authoritative gate anymore (the orchestrator runs
|
# CI is the authoritative gate for development -> review.
|
||||||
# tests locally via check_tests_local). Gitea CI is often unconfigured, so a
|
# On red CI: notify, then bounce the task back to the developer (capped retries),
|
||||||
# "failure"/empty status here is not actionable. Log only, do not alert.
|
# symmetric to the review REQUEST_CHANGES path.
|
||||||
logger.debug(f"Task {task_id}: Gitea CI state='failure' on branch '{branch}' "
|
notify_qg_failure(task_id, current_stage, "check_ci_green", f"Gitea CI failed on branch '{branch}'")
|
||||||
f"(non-authoritative, suppressed — local tests are the gate)")
|
conn = get_db()
|
||||||
|
retry_count = conn.execute(
|
||||||
|
"SELECT COUNT(*) as cnt FROM agent_runs WHERE task_id = ? AND agent = 'developer'",
|
||||||
|
(task_id,),
|
||||||
|
).fetchone()["cnt"]
|
||||||
|
conn.close()
|
||||||
|
if retry_count < MAX_DEV_RETRIES:
|
||||||
|
# task already on 'development' — no stage change needed, just relaunch developer
|
||||||
|
try:
|
||||||
|
task_desc = (
|
||||||
|
f"Work item: {work_item_id}\nRepo: {repo_name}\nBranch: {branch}\n"
|
||||||
|
f"Stage: development\nNote: CI failed, fix and re-push (attempt {retry_count + 1}/{MAX_DEV_RETRIES})"
|
||||||
|
)
|
||||||
|
job_id = enqueue_job("developer", repo_name, task_desc, task_id=task_id)
|
||||||
|
logger.info(f"Task {task_id}: CI failed, enqueued developer (attempt {retry_count + 1}, job_id={job_id})")
|
||||||
|
except Exception as e:
|
||||||
|
notify_error(task_id, f"Failed to relaunch developer after CI failure: {e}")
|
||||||
|
else:
|
||||||
|
notify_error(task_id, f"Max developer retries ({MAX_DEV_RETRIES}) reached after CI failure, escalating")
|
||||||
|
logger.error(f"Task {task_id}: max retries reached after CI failure, needs manual intervention")
|
||||||
|
|
||||||
|
|
||||||
async def handle_pr(payload: dict):
|
async def handle_pr(payload: dict):
|
||||||
@@ -315,6 +334,20 @@ async def handle_pr(payload: dict):
|
|||||||
logger.error(f"Task {task_id}: max retries reached, needs manual intervention")
|
logger.error(f"Task {task_id}: max retries reached, needs manual intervention")
|
||||||
|
|
||||||
elif action == "closed" and pr.get("merged", False):
|
elif action == "closed" and pr.get("merged", False):
|
||||||
|
# BUG 8 (second door): at the deploy stage `done` is gated by the
|
||||||
|
# deployer's verdict (check_deploy_status via advance_stage), NOT by the
|
||||||
|
# fact that the PR was merged. The deployer merges the PR at the START of
|
||||||
|
# its run, so a merged webhook arrives ~30s later while the deployer is
|
||||||
|
# still working — blindly setting done here would fake-complete the task
|
||||||
|
# and discard a later deploy_status: FAILED verdict. advance_stage will
|
||||||
|
# drive deploy→done (and Plane→Done) when the deployer job finishes.
|
||||||
|
# For every OTHER stage the merge-driven done behaviour is preserved.
|
||||||
|
if current_stage == "deploy":
|
||||||
|
logger.info(
|
||||||
|
f"Task {task_id}: PR merged at deploy stage — done gated by "
|
||||||
|
f"deployer verdict (check_deploy_status), ignoring merge-driven done."
|
||||||
|
)
|
||||||
|
return
|
||||||
update_task_stage(task_id, "done")
|
update_task_stage(task_id, "done")
|
||||||
notify_stage_change(task_id, current_stage, "done")
|
notify_stage_change(task_id, current_stage, "done")
|
||||||
logger.info(f"Task {task_id}: PR merged, stage → done")
|
logger.info(f"Task {task_id}: PR merged, stage → done")
|
||||||
|
|||||||
@@ -98,10 +98,12 @@ async def plane_webhook(request: Request):
|
|||||||
# QG-0 sanity log here (no branch, no analyst, no task row).
|
# QG-0 sanity log here (no branch, no analyst, no task row).
|
||||||
await handle_work_item_created(data, project_id)
|
await handle_work_item_created(data, project_id)
|
||||||
elif (event == "work_item.updated") or (event == "issue" and action == "updated"):
|
elif (event == "work_item.updated") or (event == "issue" and action == "updated"):
|
||||||
# Feature 1 & 2: status changes drive the pipeline.
|
# Status-only verdict model: status changes drive the pipeline.
|
||||||
# Backlog/Todo/Triage -> In Progress : START the pipeline (idempotent)
|
# Backlog/Todo/Triage -> In Progress : START pipeline, or relaunch the
|
||||||
# -> Approved : advance (== :approved: comment)
|
# stage agent if returned from
|
||||||
# -> Rejected : rollback (== :rejected: comment)
|
# Needs Input.
|
||||||
|
# -> Approved : advance to the next stage.
|
||||||
|
# -> Rejected : rollback (reason from latest comment).
|
||||||
await handle_issue_updated(data, project_id)
|
await handle_issue_updated(data, project_id)
|
||||||
elif (event == "comment.created") or (event == "issue_comment" and action == "created"):
|
elif (event == "comment.created") or (event == "issue_comment" and action == "created"):
|
||||||
await handle_comment(data, project_id)
|
await handle_comment(data, project_id)
|
||||||
@@ -127,11 +129,11 @@ async def handle_issue_updated(data: dict, project_id: str = ""):
|
|||||||
"""Feature 1 & 2: react to a Plane issue status change.
|
"""Feature 1 & 2: react to a Plane issue status change.
|
||||||
|
|
||||||
Routes the NEW state UUID (data.state.id) to:
|
Routes the NEW state UUID (data.state.id) to:
|
||||||
- in_progress : start the pipeline if this issue has no task yet
|
- in_progress : start the pipeline if this issue has no task yet; if a
|
||||||
(idempotent — an existing task is NOT restarted; protects handle_comment
|
task already exists and the stage agent is idle (returned from Needs
|
||||||
which also flips issues to In Progress during approve/answer flows).
|
Input), relaunch the stage agent so it reads Slava's fresh comments.
|
||||||
- approved : same as a :approved: comment (advance current stage).
|
- approved : advance to the next stage.
|
||||||
- rejected : same as a :rejected: comment (rollback + relaunch).
|
- rejected : rollback to the previous stage (reason from latest comment).
|
||||||
Any other status (Needs Input, In Review, Blocked, Done, board stages, etc.)
|
Any other status (Needs Input, In Review, Blocked, Done, board stages, etc.)
|
||||||
is ignored here — those are statuses the orchestrator itself sets.
|
is ignored here — those are statuses the orchestrator itself sets.
|
||||||
"""
|
"""
|
||||||
@@ -154,31 +156,105 @@ async def handle_issue_updated(data: dict, project_id: str = ""):
|
|||||||
|
|
||||||
|
|
||||||
async def handle_status_start(data: dict, project_id: str = ""):
|
async def handle_status_start(data: dict, project_id: str = ""):
|
||||||
"""Feature 1: an issue moved into In Progress -> start the pipeline.
|
"""An issue moved into In Progress.
|
||||||
|
|
||||||
Idempotent: if a task already exists for this plane_id, do nothing (no dup,
|
Two cases under the status-only verdict model:
|
||||||
no analyst restart). This is what makes handle_comment's set_issue_in_progress
|
|
||||||
safe — by then the task already exists, so the start is skipped.
|
1. No task yet for this plane_id -> START the pipeline (start_pipeline).
|
||||||
|
|
||||||
|
2. A task already exists -> this is Slava returning the issue from
|
||||||
|
Needs Input to In Progress after answering the analyst's questions. We
|
||||||
|
must RELAUNCH the current stage's agent so it reads the fresh comments
|
||||||
|
from Plane (the answer-to-questions flow used to live in handle_comment;
|
||||||
|
it is now status-driven).
|
||||||
|
|
||||||
|
KEY FORK — telling "answer to questions" apart from a plain duplicate In
|
||||||
|
Progress webhook (the dedup-protection case):
|
||||||
|
|
||||||
|
The tasks table stores no Plane status, and the issue.updated payload only
|
||||||
|
carries the NEW state (In Progress), so we cannot read the previous status
|
||||||
|
from here. Instead we use the only reliable local signal: whether the
|
||||||
|
stage's agent is currently in flight.
|
||||||
|
|
||||||
|
- The orchestrator sets In Progress itself while an agent runs. When the
|
||||||
|
agent FINISHES it leaves the issue in Needs Input or In Review and has
|
||||||
|
NO queued/running job. So: an existing task with NO active job means the
|
||||||
|
agent is idle / waiting -> a return to In Progress is a genuine relaunch
|
||||||
|
request -> enqueue the stage agent.
|
||||||
|
- If a queued/running job already exists for the task, the agent is busy
|
||||||
|
(or a duplicate webhook arrived) -> SKIP (no double launch). The events
|
||||||
|
de-dup at the top of plane_webhook already absorbs identical webhook
|
||||||
|
bodies; this job guard additionally covers distinct webhooks fired while
|
||||||
|
a job is still pending/running.
|
||||||
"""
|
"""
|
||||||
|
from ..db import has_active_job_for_task
|
||||||
|
|
||||||
plane_id = str(data.get("id") or "")
|
plane_id = str(data.get("id") or "")
|
||||||
existing = get_task_by_plane_id(plane_id)
|
existing = get_task_by_plane_id(plane_id)
|
||||||
if existing:
|
|
||||||
|
if not existing:
|
||||||
|
logger.info(f"Status->In Progress for {plane_id}: starting pipeline")
|
||||||
|
await start_pipeline(data, project_id)
|
||||||
|
return
|
||||||
|
|
||||||
|
task_id = existing["id"]
|
||||||
|
current_stage = existing["stage"]
|
||||||
|
repo = existing["repo"]
|
||||||
|
work_item_id = existing.get("work_item_id", "")
|
||||||
|
branch = existing.get("branch", "")
|
||||||
|
|
||||||
|
# Duplicate / busy guard: a job is already pending or running for this task.
|
||||||
|
if has_active_job_for_task(task_id):
|
||||||
logger.info(
|
logger.info(
|
||||||
f"Status->In Progress for {plane_id}: task already exists "
|
f"Status->In Progress for {plane_id}: task {task_id} already has an "
|
||||||
f"(stage={existing.get('stage')}), not restarting"
|
f"active job (stage={current_stage}), not relaunching"
|
||||||
)
|
)
|
||||||
return
|
return
|
||||||
logger.info(f"Status->In Progress for {plane_id}: starting pipeline")
|
|
||||||
await start_pipeline(data, project_id)
|
# Agent is idle -> Slava answered questions and returned the issue to In
|
||||||
|
# Progress. Relaunch the current stage's agent to read the fresh comments.
|
||||||
|
from ..plane_sync import STAGE_AUTHORS, add_comment as _add_comment
|
||||||
|
stage_agent = STAGE_AUTHORS.get(current_stage)
|
||||||
|
if not stage_agent:
|
||||||
|
logger.info(
|
||||||
|
f"Status->In Progress for {plane_id}: no agent for stage "
|
||||||
|
f"'{current_stage}', not relaunching"
|
||||||
|
)
|
||||||
|
return
|
||||||
|
|
||||||
|
task_desc = (
|
||||||
|
f"Work item: {work_item_id}\nRepo: {repo}\nBranch: {branch}\n"
|
||||||
|
f"Stage: {current_stage}\nNote: Stakeholder returned the issue to In "
|
||||||
|
f"Progress (answered your questions). Read the latest comments in Plane "
|
||||||
|
f"and revise your artifacts."
|
||||||
|
)
|
||||||
|
job_id = enqueue_job(stage_agent, repo, task_desc, task_id=task_id)
|
||||||
|
logger.info(
|
||||||
|
f"Task {task_id}: returned to In Progress (Needs Input answered), "
|
||||||
|
f"relaunched {stage_agent} for stage {current_stage} (job_id={job_id})"
|
||||||
|
)
|
||||||
|
try:
|
||||||
|
_add_comment(
|
||||||
|
work_item_id,
|
||||||
|
"\U0001f504 \u0410\u0433\u0435\u043d\u0442 \u043f\u0435\u0440\u0435\u0437\u0430\u043f\u0443\u0449\u0435\u043d \u0441 \u043e\u0442\u0432\u0435\u0442\u0430\u043c\u0438 \u0441\u0442\u0435\u0439\u043a\u0445\u043e\u043b\u0434\u0435\u0440\u0430.",
|
||||||
|
author=stage_agent,
|
||||||
|
)
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Failed to post relaunch comment for {work_item_id}: {e}")
|
||||||
|
|
||||||
|
|
||||||
async def handle_verdict(data: dict, project_id: str, approved: bool):
|
async def handle_verdict(data: dict, project_id: str, approved: bool):
|
||||||
"""Feature 2 (variant B): a status verdict mirrors the comment verdicts.
|
"""Status-only verdict: a Plane status change drives advance / rollback.
|
||||||
|
|
||||||
Approved status == :approved: comment -> _try_advance_stage.
|
Approved status -> _try_advance_stage. We do NOT touch the issue status here:
|
||||||
Rejected status == :rejected: comment -> rollback to previous stage + relaunch
|
_try_advance_stage -> advance_stage -> plane_notify_stage already PATCHes the
|
||||||
(reason is unknown from a status change; Slava writes it in a separate
|
issue to the NEXT stage's status. The old set_issue_in_progress call reset
|
||||||
comment, so we pass a fixed note).
|
the status to In Progress first, which made the board flicker In Progress
|
||||||
|
before the next stage (part of bug 3); it is removed.
|
||||||
|
|
||||||
|
Rejected status -> rollback to the previous stage. The reason is pulled from
|
||||||
|
the issue's latest comment (Slava writes the reason in a comment before/with
|
||||||
|
flipping the status to Rejected).
|
||||||
"""
|
"""
|
||||||
plane_id = str(data.get("id") or "")
|
plane_id = str(data.get("id") or "")
|
||||||
task = get_task_by_plane_id(plane_id)
|
task = get_task_by_plane_id(plane_id)
|
||||||
@@ -193,19 +269,68 @@ async def handle_verdict(data: dict, project_id: str, approved: bool):
|
|||||||
branch = task.get("branch", "")
|
branch = task.get("branch", "")
|
||||||
|
|
||||||
if approved:
|
if approved:
|
||||||
from ..plane_sync import set_issue_in_progress
|
# NOTE: no set_issue_in_progress here — _try_advance_stage sets the next
|
||||||
set_issue_in_progress(work_item_id)
|
# stage's status itself (advance_stage -> plane_notify_stage).
|
||||||
logger.info(f"Task {task_id}: Approved status -> advance from {current_stage}")
|
logger.info(f"Task {task_id}: Approved status -> advance from {current_stage}")
|
||||||
await _try_advance_stage(task_id, current_stage, repo, work_item_id, branch)
|
await _try_advance_stage(task_id, current_stage, repo, work_item_id, branch)
|
||||||
return
|
return
|
||||||
|
|
||||||
# Rejected: mirror the :rejected: comment rollback branch.
|
# Rejected: pull the rejection reason from the issue's latest comment.
|
||||||
reason = "(rejected via status, see latest comment)"
|
issue_id = task.get("plane_issue_id") or task.get("plane_id") or plane_id
|
||||||
|
reason = _latest_comment_reason(issue_id, repo, project_id)
|
||||||
await _rollback_stage(
|
await _rollback_stage(
|
||||||
task_id, current_stage, repo, work_item_id, branch, reason
|
task_id, current_stage, repo, work_item_id, branch, reason
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def _latest_comment_reason(issue_id: str, repo: str, project_id: str = "") -> str:
|
||||||
|
"""Fetch the issue's most recent comment text (HTML stripped) as the reject
|
||||||
|
reason. Slava writes the reason in a comment before/with flipping the status
|
||||||
|
to Rejected.
|
||||||
|
|
||||||
|
Returns a fixed fallback when there is no comment / the API call fails.
|
||||||
|
"""
|
||||||
|
from ..plane_sync import (
|
||||||
|
PLANE_BASE,
|
||||||
|
PLANE_HEADERS,
|
||||||
|
WORKSPACE,
|
||||||
|
PROJECT_ID as _DEFAULT_PROJECT_ID,
|
||||||
|
)
|
||||||
|
fallback = "Rejected via status, no reason comment"
|
||||||
|
if not issue_id:
|
||||||
|
return fallback
|
||||||
|
_proj = get_project_by_repo(repo)
|
||||||
|
pid = _proj.plane_project_id if _proj else (project_id or _DEFAULT_PROJECT_ID)
|
||||||
|
url = (
|
||||||
|
f"{PLANE_BASE}/workspaces/{WORKSPACE}/projects/{pid}/issues/"
|
||||||
|
f"{issue_id}/comments/"
|
||||||
|
)
|
||||||
|
try:
|
||||||
|
resp = httpx.get(url, headers=PLANE_HEADERS, timeout=10)
|
||||||
|
if resp.status_code != 200:
|
||||||
|
logger.warning(
|
||||||
|
f"reject-reason: GET comments for {issue_id} returned "
|
||||||
|
f"{resp.status_code}"
|
||||||
|
)
|
||||||
|
return fallback
|
||||||
|
payload = resp.json()
|
||||||
|
comments = payload.get("results", payload) if isinstance(payload, dict) else payload
|
||||||
|
if not comments:
|
||||||
|
return fallback
|
||||||
|
latest = max(comments, key=lambda c: c.get("created_at", "") or "")
|
||||||
|
raw = (
|
||||||
|
latest.get("comment_stripped")
|
||||||
|
or latest.get("comment_html")
|
||||||
|
or latest.get("comment")
|
||||||
|
or ""
|
||||||
|
)
|
||||||
|
text = re.sub(r"<[^>]+>", "", raw).strip()
|
||||||
|
return text[:300] if text else fallback
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"reject-reason: failed to fetch comments for {issue_id}: {e}")
|
||||||
|
return fallback
|
||||||
|
|
||||||
|
|
||||||
async def handle_work_item_created(data: dict, project_id: str = ""):
|
async def handle_work_item_created(data: dict, project_id: str = ""):
|
||||||
"""Feature 1: creation does NOT start the pipeline anymore.
|
"""Feature 1: creation does NOT start the pipeline anymore.
|
||||||
|
|
||||||
@@ -262,22 +387,35 @@ async def start_pipeline(data: dict, project_id: str = ""):
|
|||||||
repo = proj.repo
|
repo = proj.repo
|
||||||
plane_project_id = proj.plane_project_id
|
plane_project_id = proj.plane_project_id
|
||||||
|
|
||||||
# BUG 1: Plane's issue.updated webhook (status change -> In Progress) sends
|
# BUG 1 + BUG B: Plane's issue.updated webhook (status change -> In Progress)
|
||||||
# only the CHANGED fields, so description / description_stripped are usually
|
# sends only the CHANGED fields, so BOTH description / description_stripped
|
||||||
# empty here even though the issue HAS a description. If the payload's
|
# AND name are usually empty here even though the issue HAS them. Pull the
|
||||||
# description is missing/too short, pull the full one from the Plane issue
|
# full title + description from the Plane issue detail API in a SINGLE GET
|
||||||
# detail API (same GET endpoint + shared token already used by
|
# (fetch_issue_fields: same endpoint + shared token already used by
|
||||||
# fetch_issue_sequence_id) before QG-0 runs. If the API is also empty, QG-0
|
# fetch_issue_sequence_id) before QG-0 and before the branch slug is built.
|
||||||
# legitimately fails (truly empty ticket).
|
# If the API is also empty, QG-0 legitimately fails (truly empty ticket) and
|
||||||
if not description or len(description.strip()) < 20:
|
# name falls back to "untitled".
|
||||||
from ..plane_sync import fetch_issue_description
|
name_missing = (not name) or name.strip().lower() == "untitled" or len(name.strip()) < 3
|
||||||
fetched = fetch_issue_description(plane_id, plane_project_id)
|
desc_missing = (not description) or len(description.strip()) < 20
|
||||||
if fetched and len(fetched.strip()) >= len(description.strip()):
|
if name_missing or desc_missing:
|
||||||
description = fetched
|
from ..plane_sync import fetch_issue_fields
|
||||||
|
fetched_name, fetched_desc = fetch_issue_fields(plane_id, plane_project_id)
|
||||||
|
if desc_missing and fetched_desc and len(fetched_desc.strip()) >= len(description.strip()):
|
||||||
|
description = fetched_desc
|
||||||
logger.info(
|
logger.info(
|
||||||
f"start_pipeline: pulled description from Plane API for {plane_id} "
|
f"start_pipeline: pulled description from Plane API for {plane_id} "
|
||||||
f"({len(description.strip())} chars)"
|
f"({len(description.strip())} chars)"
|
||||||
)
|
)
|
||||||
|
if name_missing and fetched_name and len(fetched_name.strip()) >= 3:
|
||||||
|
name = fetched_name
|
||||||
|
logger.info(
|
||||||
|
f"start_pipeline: pulled name from Plane API for {plane_id} "
|
||||||
|
f"('{name}')"
|
||||||
|
)
|
||||||
|
# BUG B fallback: if name is still empty/blank after the API pull, keep the
|
||||||
|
# legacy "untitled" so the slug/branch build never crashes on an empty name.
|
||||||
|
if not name or not name.strip():
|
||||||
|
name = "untitled"
|
||||||
|
|
||||||
# QG-0 validation (hard gate on pipeline start)
|
# QG-0 validation (hard gate on pipeline start)
|
||||||
errors = _qg0_errors(name, description)
|
errors = _qg0_errors(name, description)
|
||||||
@@ -356,8 +494,9 @@ async def start_pipeline(data: dict, project_id: str = ""):
|
|||||||
# Insert task into DB
|
# Insert task into DB
|
||||||
conn = get_db()
|
conn = get_db()
|
||||||
conn.execute(
|
conn.execute(
|
||||||
"INSERT INTO tasks (plane_id, work_item_id, repo, branch, stage, plane_issue_id) VALUES (?, ?, ?, ?, ?, ?)",
|
"INSERT INTO tasks (plane_id, work_item_id, repo, branch, stage, plane_issue_id, title) "
|
||||||
(plane_id, work_item_id, repo, branch, "analysis", plane_id),
|
"VALUES (?, ?, ?, ?, ?, ?, ?)",
|
||||||
|
(plane_id, work_item_id, repo, branch, "analysis", plane_id, name),
|
||||||
)
|
)
|
||||||
conn.commit()
|
conn.commit()
|
||||||
conn.close()
|
conn.close()
|
||||||
@@ -384,7 +523,10 @@ async def start_pipeline(data: dict, project_id: str = ""):
|
|||||||
task_row = get_db().execute("SELECT id FROM tasks WHERE work_item_id=?", (work_item_id,)).fetchone()
|
task_row = get_db().execute("SELECT id FROM tasks WHERE work_item_id=?", (work_item_id,)).fetchone()
|
||||||
if task_row:
|
if task_row:
|
||||||
task_id = task_row[0]
|
task_id = task_row[0]
|
||||||
task_desc = f"Work item: {work_item_id}\nRepo: {repo}\nBranch: {branch}\nStage: analysis\nTitle: {name}"
|
task_desc = (
|
||||||
|
f"Work item: {work_item_id}\nRepo: {repo}\nBranch: {branch}\n"
|
||||||
|
f"Stage: analysis\nTitle: {name}\n\nDescription:\n{description}"
|
||||||
|
)
|
||||||
job_id = enqueue_job("analyst", repo, task_desc, task_id=task_id)
|
job_id = enqueue_job("analyst", repo, task_desc, task_id=task_id)
|
||||||
logger.info(f"Task {task_id}: enqueued analyst (job_id={job_id})")
|
logger.info(f"Task {task_id}: enqueued analyst (job_id={job_id})")
|
||||||
# Post start comment to Plane
|
# Post start comment to Plane
|
||||||
@@ -395,108 +537,34 @@ async def start_pipeline(data: dict, project_id: str = ""):
|
|||||||
|
|
||||||
|
|
||||||
async def handle_comment(data: dict, project_id: str = ""):
|
async def handle_comment(data: dict, project_id: str = ""):
|
||||||
|
"""Status-only verdict model: comments NEVER drive the pipeline.
|
||||||
|
|
||||||
|
The whole comment-based control mechanism (``:approved:`` / ``:rejected:``
|
||||||
|
and the analysis answer-to-questions flow) was removed. It caused bug 3
|
||||||
|
(echo self-hit): the analyst posts its own "waiting for approval" comment,
|
||||||
|
handle_comment catches its own comment and reverts In Review -> In Progress.
|
||||||
|
|
||||||
|
Comments are now logged only — no status change, no enqueue, no side effect.
|
||||||
|
The pipeline is driven solely by status changes (handle_issue_updated):
|
||||||
|
- Approved -> advance
|
||||||
|
- Rejected -> rollback (reason pulled from the latest comment)
|
||||||
|
- In Progress (returned from Needs Input) -> relaunch the stage agent
|
||||||
"""
|
"""
|
||||||
Handle comment event — check for :approved: or :rejected:.
|
plane_id = str(
|
||||||
Advance or rollback stage accordingly.
|
data.get("work_item_id") or data.get("issue_id") or data.get("issue") or ""
|
||||||
"""
|
)
|
||||||
comment_body = data.get("comment_stripped", data.get("comment", data.get("body", data.get("comment_html", ""))))
|
logger.info(
|
||||||
plane_id = str(data.get("work_item_id") or data.get("issue_id") or data.get("issue") or "")
|
f"comment.created for {plane_id}: logged only, no pipeline action "
|
||||||
|
f"(status-only verdict model)"
|
||||||
if not plane_id:
|
)
|
||||||
logger.warning("Comment event without work_item_id, skipping")
|
|
||||||
return
|
|
||||||
|
|
||||||
task = get_task_by_plane_id(plane_id)
|
|
||||||
if not task:
|
|
||||||
logger.warning(f"No task found for plane_id={plane_id}")
|
|
||||||
return
|
|
||||||
|
|
||||||
task_id = task["id"]
|
|
||||||
current_stage = task["stage"]
|
|
||||||
repo = task["repo"]
|
|
||||||
work_item_id = task.get("work_item_id", "")
|
|
||||||
branch = task.get("branch", "")
|
|
||||||
|
|
||||||
if ":rejected:" in comment_body:
|
|
||||||
# Extract reason (text after :rejected:)
|
|
||||||
reason = comment_body.split(":rejected:", 1)[-1].strip()[:300]
|
|
||||||
await _rollback_stage(task_id, current_stage, repo, work_item_id, branch, reason)
|
|
||||||
return
|
|
||||||
|
|
||||||
if ":approved:" in comment_body:
|
|
||||||
from ..plane_sync import set_issue_in_progress
|
|
||||||
set_issue_in_progress(work_item_id)
|
|
||||||
# Try to advance stage
|
|
||||||
await _try_advance_stage(task_id, current_stage, repo, work_item_id, branch)
|
|
||||||
return
|
|
||||||
|
|
||||||
# Task 3: If neither :approved: nor :rejected: — check if this is an answer to questions
|
|
||||||
if current_stage == "analysis":
|
|
||||||
from ..plane_sync import PLANE_STATES, set_issue_in_progress
|
|
||||||
issue_id = task.get("plane_issue_id") or task.get("plane_id")
|
|
||||||
if not issue_id:
|
|
||||||
issue_id = plane_id
|
|
||||||
if issue_id:
|
|
||||||
from ..plane_sync import PLANE_BASE, PLANE_HEADERS, WORKSPACE
|
|
||||||
from ..plane_sync import PROJECT_ID as _DEFAULT_PROJECT_ID
|
|
||||||
# ORCH-6: route to this task's own Plane project (resolved from repo).
|
|
||||||
_proj = get_project_by_repo(repo)
|
|
||||||
_pid = _proj.plane_project_id if _proj else (project_id or _DEFAULT_PROJECT_ID)
|
|
||||||
import httpx as _httpx
|
|
||||||
try:
|
|
||||||
_resp = _httpx.get(
|
|
||||||
f"{PLANE_BASE}/workspaces/{WORKSPACE}/projects/{_pid}/issues/{issue_id}/",
|
|
||||||
headers=PLANE_HEADERS, timeout=10
|
|
||||||
)
|
|
||||||
if _resp.status_code == 200:
|
|
||||||
issue_data = _resp.json()
|
|
||||||
if issue_data.get("state") == PLANE_STATES["needs_input"]:
|
|
||||||
# Task 11: Check analyst retry count (max 3 question rounds)
|
|
||||||
conn3 = get_db()
|
|
||||||
analyst_runs = conn3.execute(
|
|
||||||
"SELECT COUNT(*) FROM agent_runs WHERE task_id=? AND agent='analyst'",
|
|
||||||
(task_id,)
|
|
||||||
).fetchone()[0]
|
|
||||||
conn3.close()
|
|
||||||
|
|
||||||
if analyst_runs >= 4: # initial + 3 retries
|
|
||||||
from ..plane_sync import set_issue_blocked, add_comment as _pc
|
|
||||||
set_issue_blocked(work_item_id)
|
|
||||||
_pc(
|
|
||||||
work_item_id,
|
|
||||||
"\U0001f6a8 3 \u0440\u0430\u0443\u043d\u0434\u0430 \u0443\u0442\u043e\u0447\u043d\u0435\u043d\u0438\u0439 \u0438\u0441\u0447\u0435\u0440\u043f\u0430\u043d\u044b. Analyst \u043d\u0435 \u043c\u043e\u0436\u0435\u0442 \u0441\u0444\u043e\u0440\u043c\u0438\u0440\u043e\u0432\u0430\u0442\u044c \u0422\u0417. "
|
|
||||||
"\u0422\u0440\u0435\u0431\u0443\u0435\u0442\u0441\u044f \u0431\u043e\u043b\u0435\u0435 \u0434\u0435\u0442\u0430\u043b\u044c\u043d\u043e\u0435 \u043e\u043f\u0438\u0441\u0430\u043d\u0438\u0435 \u0438\u043b\u0438 \u0432\u0441\u0442\u0440\u0435\u0447\u0430.",
|
|
||||||
author="analyst",
|
|
||||||
)
|
|
||||||
from ..notifications import send_telegram
|
|
||||||
send_telegram(f"\U0001f6a8 {work_item_id}: 3 \u0440\u0430\u0443\u043d\u0434\u0430 \u0432\u043e\u043f\u0440\u043e\u0441\u043e\u0432 analyst'\u0430 \u0438\u0441\u0447\u0435\u0440\u043f\u0430\u043d\u044b. \u041d\u0443\u0436\u043d\u0430 \u043f\u043e\u043c\u043e\u0449\u044c.")
|
|
||||||
return
|
|
||||||
|
|
||||||
# This is an answer to analyst's questions — relaunch
|
|
||||||
set_issue_in_progress(work_item_id)
|
|
||||||
task_desc = (
|
|
||||||
f"Work item: {work_item_id}\nRepo: {repo}\nBranch: {branch}\n"
|
|
||||||
f"Stage: analysis\nNote: Stakeholder answered your questions. "
|
|
||||||
f"Read the latest comment in Plane and revise your artifacts.\n"
|
|
||||||
f"Answer: {comment_body[:500]}"
|
|
||||||
)
|
|
||||||
new_job = enqueue_job("analyst", repo, task_desc, task_id=task_id)
|
|
||||||
from ..plane_sync import add_comment as _pc2
|
|
||||||
_pc2(work_item_id, "\U0001f504 Analyst \u043f\u0435\u0440\u0435\u0437\u0430\u043f\u0443\u0449\u0435\u043d \u0441 \u043e\u0442\u0432\u0435\u0442\u0430\u043c\u0438 \u0441\u0442\u0435\u0439\u043a\u0445\u043e\u043b\u0434\u0435\u0440\u0430.", author="analyst")
|
|
||||||
logger.info(f"Task {task_id}: stakeholder answered questions, enqueued analyst (job_id={new_job})")
|
|
||||||
return
|
|
||||||
except Exception as e:
|
|
||||||
logger.error(f"Failed to check issue state: {e}")
|
|
||||||
|
|
||||||
|
|
||||||
async def _rollback_stage(
|
async def _rollback_stage(
|
||||||
task_id: int, current_stage: str, repo: str, work_item_id: str, branch: str,
|
task_id: int, current_stage: str, repo: str, work_item_id: str, branch: str,
|
||||||
reason: str,
|
reason: str,
|
||||||
):
|
):
|
||||||
"""Shared :rejected: / Rejected-status rollback (Feature 2 variant B).
|
"""Rollback triggered by a status change to Rejected.
|
||||||
|
|
||||||
Both the :rejected: comment and a status change to Rejected funnel here so
|
|
||||||
the two mechanisms behave identically:
|
|
||||||
- at analysis: relaunch the analyst with the rejection reason;
|
- at analysis: relaunch the analyst with the rejection reason;
|
||||||
- otherwise: roll back to the previous stage and relaunch its agent
|
- otherwise: roll back to the previous stage and relaunch its agent
|
||||||
(via the existing rollback notify + an enqueue of the prev-stage agent).
|
(via the existing rollback notify + an enqueue of the prev-stage agent).
|
||||||
@@ -565,10 +633,10 @@ async def _try_advance_stage(
|
|||||||
is synchronous. We run it off the event loop via asyncio.to_thread so there
|
is synchronous. We run it off the event loop via asyncio.to_thread so there
|
||||||
is exactly one implementation shared with the launcher.
|
is exactly one implementation shared with the launcher.
|
||||||
|
|
||||||
finished_agent is None on this webhook path (a human :approved: comment, not
|
finished_agent is None on this webhook path (a human Approved status change,
|
||||||
a finished agent), so the agent-specific rollback branches inside the engine
|
not a finished agent), so the agent-specific rollback branches inside the
|
||||||
intentionally do not trigger — identical to the old plane behavior, which
|
engine intentionally do not trigger — the webhook path only runs the QG and
|
||||||
only ran the QG and either advanced or reported the failure.
|
either advances or reports the failure.
|
||||||
"""
|
"""
|
||||||
import asyncio
|
import asyncio
|
||||||
from ..stage_engine import advance_stage
|
from ..stage_engine import advance_stage
|
||||||
|
|||||||
74
tests/test_analyst_comment.py
Normal file
74
tests/test_analyst_comment.py
Normal file
@@ -0,0 +1,74 @@
|
|||||||
|
"""BUG C: analyst "artifacts ready" comment under the status-only model.
|
||||||
|
|
||||||
|
The comment must ask for the **Approved** status (not the obsolete
|
||||||
|
":approved:" reaction, not moving back to "In Progress") and link only the
|
||||||
|
docs that actually exist in the worktree.
|
||||||
|
"""
|
||||||
|
|
||||||
|
import os
|
||||||
|
import tempfile
|
||||||
|
|
||||||
|
os.environ.setdefault("ORCH_GITEA_TOKEN", "test-token")
|
||||||
|
os.environ.setdefault("ORCH_PLANE_API_TOKEN", "test-token")
|
||||||
|
|
||||||
|
|
||||||
|
def test_analyst_comment_asks_approved_with_links(monkeypatch, tmp_path):
|
||||||
|
from src import stage_engine as SE
|
||||||
|
|
||||||
|
# Worktree with only SOME of the candidate docs present.
|
||||||
|
wt = tmp_path / "wt"
|
||||||
|
docs = wt / "docs" / "work-items" / "ET-011"
|
||||||
|
docs.mkdir(parents=True)
|
||||||
|
for fname in ("00-business-request.md", "01-brd.md", "02-trz.md",
|
||||||
|
"03-acceptance-criteria.md", "04-test-plan.yaml"):
|
||||||
|
(docs / fname).write_text("x")
|
||||||
|
# 04b-ui-test-cases.md intentionally absent -> must NOT be linked
|
||||||
|
|
||||||
|
monkeypatch.setattr(SE, "get_worktree_path", lambda repo, branch: str(wt))
|
||||||
|
# public URL set -> links must be built from it (not gitea_url)
|
||||||
|
monkeypatch.setattr(SE.settings, "gitea_url", "http://localhost:3000")
|
||||||
|
monkeypatch.setattr(SE.settings, "gitea_public_url", "https://git.mva154.duckdns.org")
|
||||||
|
monkeypatch.setattr(SE.settings, "gitea_owner", "admin")
|
||||||
|
|
||||||
|
html = SE._build_analyst_ready_comment(
|
||||||
|
"enduro-trails", "ET-011", "feature/ET-011-gpx-upload-feature"
|
||||||
|
)
|
||||||
|
|
||||||
|
# text asks for the Approved STATUS, not the obsolete mechanisms
|
||||||
|
assert "Approved" in html
|
||||||
|
assert ":approved:" not in html
|
||||||
|
assert "In Progress" not in html
|
||||||
|
assert "Rejected" in html
|
||||||
|
# clickable links to docs that ACTUALLY exist
|
||||||
|
assert "<a href=" in html
|
||||||
|
base = ("https://git.mva154.duckdns.org/admin/enduro-trails/src/branch/"
|
||||||
|
"feature/ET-011-gpx-upload-feature/docs/work-items/ET-011/")
|
||||||
|
assert base + "01-brd.md" in html
|
||||||
|
assert base + "04-test-plan.yaml" in html
|
||||||
|
# the missing file is NOT invented
|
||||||
|
assert "04b-ui-test-cases.md" not in html
|
||||||
|
# internal git url must NOT appear in clickable links
|
||||||
|
assert "localhost:3000" not in html
|
||||||
|
|
||||||
|
|
||||||
|
def test_analyst_comment_falls_back_to_gitea_url(monkeypatch, tmp_path):
|
||||||
|
"""When gitea_public_url is empty, links fall back to gitea_url."""
|
||||||
|
from src import stage_engine as SE
|
||||||
|
|
||||||
|
wt = tmp_path / "wt"
|
||||||
|
docs = wt / "docs" / "work-items" / "ET-011"
|
||||||
|
docs.mkdir(parents=True)
|
||||||
|
(docs / "01-brd.md").write_text("x")
|
||||||
|
|
||||||
|
monkeypatch.setattr(SE, "get_worktree_path", lambda repo, branch: str(wt))
|
||||||
|
monkeypatch.setattr(SE.settings, "gitea_url", "http://localhost:3000")
|
||||||
|
monkeypatch.setattr(SE.settings, "gitea_public_url", "")
|
||||||
|
monkeypatch.setattr(SE.settings, "gitea_owner", "admin")
|
||||||
|
|
||||||
|
html = SE._build_analyst_ready_comment(
|
||||||
|
"enduro-trails", "ET-011", "feature/ET-011-gpx-upload-feature"
|
||||||
|
)
|
||||||
|
|
||||||
|
base = ("http://localhost:3000/admin/enduro-trails/src/branch/"
|
||||||
|
"feature/ET-011-gpx-upload-feature/docs/work-items/ET-011/")
|
||||||
|
assert base + "01-brd.md" in html
|
||||||
@@ -109,17 +109,19 @@ def _to_in_progress_no_desc(plane_id="bug1"):
|
|||||||
@patch("src.webhooks.plane._create_initial_docs", new_callable=AsyncMock)
|
@patch("src.webhooks.plane._create_initial_docs", new_callable=AsyncMock)
|
||||||
@patch("src.webhooks.plane._create_gitea_branch", new_callable=AsyncMock)
|
@patch("src.webhooks.plane._create_gitea_branch", new_callable=AsyncMock)
|
||||||
@patch("src.plane_sync.fetch_issue_sequence_id", return_value=42)
|
@patch("src.plane_sync.fetch_issue_sequence_id", return_value=42)
|
||||||
@patch("src.plane_sync.fetch_issue_description",
|
@patch("src.plane_sync.fetch_issue_fields",
|
||||||
return_value="This is a sufficiently long description fetched from Plane API.")
|
return_value=("A valid backlog item title",
|
||||||
|
"This is a sufficiently long description fetched from Plane API."))
|
||||||
def test_status_start_fetches_description(
|
def test_status_start_fetches_description(
|
||||||
mock_desc, mock_seq, mock_branch, mock_docs, mock_enqueue
|
mock_fields, mock_seq, mock_branch, mock_docs, mock_enqueue
|
||||||
):
|
):
|
||||||
"""BUG 1: empty description in payload -> start_pipeline pulls it from the
|
"""BUG 1: empty description in payload -> start_pipeline pulls it from the
|
||||||
Plane API -> QG-0 passes -> task created + analyst enqueued (NOT blocked)."""
|
Plane API (single fetch_issue_fields GET) -> QG-0 passes -> task created +
|
||||||
|
analyst enqueued (NOT blocked)."""
|
||||||
resp = _to_in_progress_no_desc("bug1")
|
resp = _to_in_progress_no_desc("bug1")
|
||||||
assert resp.status_code == 200
|
assert resp.status_code == 200
|
||||||
# description was pulled from the API
|
# name + description were pulled from the API in one call
|
||||||
mock_desc.assert_called_once()
|
mock_fields.assert_called_once()
|
||||||
# QG-0 passed -> task created and analyst launched (NOT set_issue_blocked)
|
# QG-0 passed -> task created and analyst launched (NOT set_issue_blocked)
|
||||||
assert _count("bug1") == 1
|
assert _count("bug1") == 1
|
||||||
assert _task("bug1")["stage"] == "analysis"
|
assert _task("bug1")["stage"] == "analysis"
|
||||||
@@ -131,15 +133,15 @@ def test_status_start_fetches_description(
|
|||||||
@patch("src.webhooks.plane._create_initial_docs", new_callable=AsyncMock)
|
@patch("src.webhooks.plane._create_initial_docs", new_callable=AsyncMock)
|
||||||
@patch("src.webhooks.plane._create_gitea_branch", new_callable=AsyncMock)
|
@patch("src.webhooks.plane._create_gitea_branch", new_callable=AsyncMock)
|
||||||
@patch("src.plane_sync.fetch_issue_sequence_id", return_value=42)
|
@patch("src.plane_sync.fetch_issue_sequence_id", return_value=42)
|
||||||
@patch("src.plane_sync.fetch_issue_description", return_value="")
|
@patch("src.plane_sync.fetch_issue_fields", return_value=("", ""))
|
||||||
def test_status_start_empty_api_still_blocks(
|
def test_status_start_empty_api_still_blocks(
|
||||||
mock_desc, mock_seq, mock_branch, mock_docs, mock_enqueue
|
mock_fields, mock_seq, mock_branch, mock_docs, mock_enqueue
|
||||||
):
|
):
|
||||||
"""BUG 1 negative path: if the API also returns empty, QG-0 legitimately
|
"""BUG 1 negative path: if the API also returns empty, QG-0 legitimately
|
||||||
fails -> NO task is created (truly empty ticket)."""
|
fails -> NO task is created (truly empty ticket)."""
|
||||||
resp = _to_in_progress_no_desc("bug1-empty")
|
resp = _to_in_progress_no_desc("bug1-empty")
|
||||||
assert resp.status_code == 200
|
assert resp.status_code == 200
|
||||||
mock_desc.assert_called_once()
|
mock_fields.assert_called_once()
|
||||||
assert _count("bug1-empty") == 0
|
assert _count("bug1-empty") == 0
|
||||||
mock_enqueue.assert_not_called()
|
mock_enqueue.assert_not_called()
|
||||||
|
|
||||||
@@ -168,10 +170,11 @@ def test_work_item_id_uniqueness():
|
|||||||
@patch("src.webhooks.plane._create_initial_docs", new_callable=AsyncMock)
|
@patch("src.webhooks.plane._create_initial_docs", new_callable=AsyncMock)
|
||||||
@patch("src.webhooks.plane._create_gitea_branch", new_callable=AsyncMock)
|
@patch("src.webhooks.plane._create_gitea_branch", new_callable=AsyncMock)
|
||||||
@patch("src.plane_sync.fetch_issue_sequence_id", return_value=6)
|
@patch("src.plane_sync.fetch_issue_sequence_id", return_value=6)
|
||||||
@patch("src.plane_sync.fetch_issue_description",
|
@patch("src.plane_sync.fetch_issue_fields",
|
||||||
return_value="A sufficiently long description for QG-0 to pass cleanly.")
|
return_value=("Popup enduro trails feature",
|
||||||
|
"A sufficiently long description for QG-0 to pass cleanly."))
|
||||||
def test_collision_reassigns_in_start_pipeline(
|
def test_collision_reassigns_in_start_pipeline(
|
||||||
mock_desc, mock_seq, mock_branch, mock_docs, mock_enqueue
|
mock_fields, mock_seq, mock_branch, mock_docs, mock_enqueue
|
||||||
):
|
):
|
||||||
"""BUG 2a end-to-end: ET-006 already exists -> a new In Progress issue whose
|
"""BUG 2a end-to-end: ET-006 already exists -> a new In Progress issue whose
|
||||||
Plane sequence_id is also 6 must NOT reuse ET-006."""
|
Plane sequence_id is also 6 must NOT reuse ET-006."""
|
||||||
|
|||||||
175
tests/test_qg.py
175
tests/test_qg.py
@@ -17,7 +17,10 @@ from src.qg.checks import (
|
|||||||
check_ci_green,
|
check_ci_green,
|
||||||
check_review_approved,
|
check_review_approved,
|
||||||
check_tests_passed,
|
check_tests_passed,
|
||||||
|
check_tests_local,
|
||||||
|
check_deploy_status,
|
||||||
)
|
)
|
||||||
|
from src.stages import get_qg_for_stage
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture(autouse=True)
|
@pytest.fixture(autouse=True)
|
||||||
@@ -186,3 +189,175 @@ class TestCheckTestsPassed:
|
|||||||
passed, reason = check_tests_passed("enduro-trails", "ET-001")
|
passed, reason = check_tests_passed("enduro-trails", "ET-001")
|
||||||
assert passed is False
|
assert passed is False
|
||||||
assert "not found" in reason.lower()
|
assert "not found" in reason.lower()
|
||||||
|
|
||||||
|
|
||||||
|
class TestCheckDeployStatus:
|
||||||
|
"""BUG 8: deploy -> done must be gated on the deployer's machine-readable
|
||||||
|
deploy_status verdict in 14-deploy-log.md frontmatter, NOT the LLM exit code
|
||||||
|
(always 0). Mirrors check_reviewer_verdict (reads ONLY the frontmatter field)."""
|
||||||
|
|
||||||
|
def _write_log(self, repo_dir, content):
|
||||||
|
wi_dir = repo_dir / "docs" / "work-items" / "ET-011"
|
||||||
|
wi_dir.mkdir(parents=True)
|
||||||
|
(wi_dir / "14-deploy-log.md").write_text(content)
|
||||||
|
|
||||||
|
def test_success_verdict_passes(self, setup_work_item_dir):
|
||||||
|
self._write_log(
|
||||||
|
setup_work_item_dir,
|
||||||
|
"---\ndeploy_status: SUCCESS\nversion: v0.0.3\n---\n\nDeployed OK.\n",
|
||||||
|
)
|
||||||
|
passed, reason = check_deploy_status("enduro-trails", "ET-011")
|
||||||
|
assert passed is True
|
||||||
|
assert "SUCCESS" in reason
|
||||||
|
|
||||||
|
def test_failed_verdict_fails(self, setup_work_item_dir):
|
||||||
|
self._write_log(
|
||||||
|
setup_work_item_dir,
|
||||||
|
"---\ndeploy_status: FAILED\nversion: v0.0.3\n---\n\npermission denied.\n",
|
||||||
|
)
|
||||||
|
passed, reason = check_deploy_status("enduro-trails", "ET-011")
|
||||||
|
assert passed is False
|
||||||
|
assert "FAILED" in reason
|
||||||
|
|
||||||
|
def test_no_file_fails(self, setup_work_item_dir):
|
||||||
|
passed, reason = check_deploy_status("enduro-trails", "ET-011")
|
||||||
|
assert passed is False
|
||||||
|
assert "not found" in reason.lower()
|
||||||
|
|
||||||
|
def test_no_field_fails(self, setup_work_item_dir):
|
||||||
|
# Frontmatter present but no deploy_status field -> must NOT pass.
|
||||||
|
self._write_log(
|
||||||
|
setup_work_item_dir,
|
||||||
|
"---\nversion: v0.0.3\n---\n\nStatus: FAILED (prose only).\n",
|
||||||
|
)
|
||||||
|
passed, reason = check_deploy_status("enduro-trails", "ET-011")
|
||||||
|
assert passed is False
|
||||||
|
|
||||||
|
def test_prose_only_no_frontmatter_fails(self, setup_work_item_dir):
|
||||||
|
# Prose mentioning SUCCESS but no machine-readable frontmatter -> fail.
|
||||||
|
self._write_log(
|
||||||
|
setup_work_item_dir,
|
||||||
|
"# Deploy log\n\nStatus: SUCCESS (prose, not frontmatter).\n",
|
||||||
|
)
|
||||||
|
passed, reason = check_deploy_status("enduro-trails", "ET-011")
|
||||||
|
assert passed is False
|
||||||
|
|
||||||
|
# --- ET-013 path-sync fix: log written to origin/main via separate PR ---
|
||||||
|
|
||||||
|
def test_origin_main_success_passes_when_absent_in_worktree(self, monkeypatch):
|
||||||
|
# Deployer merged 14-deploy-log.md into main via a separate PR; it is NOT
|
||||||
|
# in the feature worktree. Gate must recover it from origin/main -> PASS.
|
||||||
|
# (This is the exact ET-013 regression.)
|
||||||
|
monkeypatch.setattr(
|
||||||
|
"src.qg.checks._deploy_log_from_main",
|
||||||
|
lambda repo, wi: "---\ndeploy_status: SUCCESS\nversion: v0.0.5\n---\n\nLive.\n",
|
||||||
|
)
|
||||||
|
passed, reason = check_deploy_status("enduro-trails", "ET-013")
|
||||||
|
assert passed is True
|
||||||
|
assert "SUCCESS" in reason
|
||||||
|
|
||||||
|
def test_origin_main_failed_fails(self, monkeypatch):
|
||||||
|
# A genuine FAILED log in main must still fail.
|
||||||
|
monkeypatch.setattr(
|
||||||
|
"src.qg.checks._deploy_log_from_main",
|
||||||
|
lambda repo, wi: "---\ndeploy_status: FAILED\nversion: v0.0.5\n---\n\nboom.\n",
|
||||||
|
)
|
||||||
|
passed, reason = check_deploy_status("enduro-trails", "ET-013")
|
||||||
|
assert passed is False
|
||||||
|
assert "FAILED" in reason
|
||||||
|
|
||||||
|
def test_absent_everywhere_fails(self, monkeypatch):
|
||||||
|
# Not in worktree and origin/main lookup yields nothing -> not found.
|
||||||
|
monkeypatch.setattr(
|
||||||
|
"src.qg.checks._deploy_log_from_main", lambda repo, wi: None
|
||||||
|
)
|
||||||
|
passed, reason = check_deploy_status("enduro-trails", "ET-013")
|
||||||
|
assert passed is False
|
||||||
|
assert "not found" in reason.lower()
|
||||||
|
|
||||||
|
@patch("src.qg.checks.subprocess.run")
|
||||||
|
@patch("src.qg.checks.os.path.isdir", return_value=True)
|
||||||
|
def test_fetch_failure_degrades_no_exception(self, mock_isdir, mock_run):
|
||||||
|
# git fetch/show raising (e.g. network) must degrade to "not found",
|
||||||
|
# never propagate an exception out of the gate.
|
||||||
|
import subprocess as _sp
|
||||||
|
mock_run.side_effect = _sp.TimeoutExpired(cmd="git", timeout=30)
|
||||||
|
passed, reason = check_deploy_status("enduro-trails", "ET-013")
|
||||||
|
assert passed is False
|
||||||
|
assert "not found" in reason.lower()
|
||||||
|
|
||||||
|
def test_worktree_log_short_circuits_main_lookup(self, setup_work_item_dir, monkeypatch):
|
||||||
|
# If the log IS present in the worktree, origin/main must NOT be consulted.
|
||||||
|
self._write_log(
|
||||||
|
setup_work_item_dir,
|
||||||
|
"---\ndeploy_status: SUCCESS\nversion: v0.0.3\n---\n\nDeployed OK.\n",
|
||||||
|
)
|
||||||
|
called = {"n": 0}
|
||||||
|
def _boom(repo, wi):
|
||||||
|
called["n"] += 1
|
||||||
|
return None
|
||||||
|
monkeypatch.setattr("src.qg.checks._deploy_log_from_main", _boom)
|
||||||
|
passed, reason = check_deploy_status("enduro-trails", "ET-011")
|
||||||
|
assert passed is True
|
||||||
|
assert called["n"] == 0
|
||||||
|
|
||||||
|
def test_deploy_stage_qg_is_check_deploy_status(self):
|
||||||
|
assert get_qg_for_stage("deploy") == "check_deploy_status"
|
||||||
|
|
||||||
|
def test_registered_in_qg_checks(self):
|
||||||
|
from src.qg.checks import QG_CHECKS
|
||||||
|
assert QG_CHECKS.get("check_deploy_status") is check_deploy_status
|
||||||
|
|
||||||
|
|
||||||
|
class TestDevelopmentStageQG:
|
||||||
|
"""BUG 6: development stage QG is now check_ci_green (CI is the authoritative
|
||||||
|
gate), not the deprecated check_tests_local."""
|
||||||
|
|
||||||
|
def test_development_qg_is_check_ci_green(self):
|
||||||
|
assert get_qg_for_stage("development") == "check_ci_green"
|
||||||
|
|
||||||
|
def test_check_tests_local_is_deprecated_and_unwired(self):
|
||||||
|
# Kept in the registry for backward-compat, but not wired to any stage.
|
||||||
|
from src.qg.checks import QG_CHECKS
|
||||||
|
from src.stages import STAGE_TRANSITIONS
|
||||||
|
assert "check_tests_local" in QG_CHECKS
|
||||||
|
wired = {t.get("qg") for t in STAGE_TRANSITIONS.values()}
|
||||||
|
assert "check_tests_local" not in wired
|
||||||
|
|
||||||
|
|
||||||
|
class TestCheckTestsLocal:
|
||||||
|
"""BUG 5: check_tests_local must run pytest directly (not make, which is
|
||||||
|
not installed in the orchestrator container)."""
|
||||||
|
|
||||||
|
@patch("src.qg.checks.ensure_worktree")
|
||||||
|
@patch("subprocess.run")
|
||||||
|
def test_passes_on_returncode_zero(self, mock_run, mock_wt, tmp_path):
|
||||||
|
mock_wt.return_value = str(tmp_path)
|
||||||
|
mock_run.return_value = MagicMock(returncode=0, stdout="ok", stderr="")
|
||||||
|
passed, reason = check_tests_local("enduro-trails", "feature/ET-001-x")
|
||||||
|
assert passed is True
|
||||||
|
assert reason == "Local tests passed"
|
||||||
|
|
||||||
|
@patch("src.qg.checks.ensure_worktree")
|
||||||
|
@patch("subprocess.run")
|
||||||
|
def test_fails_on_nonzero_returncode(self, mock_run, mock_wt, tmp_path):
|
||||||
|
mock_wt.return_value = str(tmp_path)
|
||||||
|
mock_run.return_value = MagicMock(returncode=1, stdout="boom", stderr="trace")
|
||||||
|
passed, reason = check_tests_local("enduro-trails", "feature/ET-001-x")
|
||||||
|
assert passed is False
|
||||||
|
assert "Local tests failed" in reason
|
||||||
|
|
||||||
|
@patch("src.qg.checks.ensure_worktree")
|
||||||
|
@patch("subprocess.run")
|
||||||
|
def test_invokes_pytest_not_make(self, mock_run, mock_wt, tmp_path):
|
||||||
|
"""The subprocess call must be pytest, from src/api, against ../../tests/."""
|
||||||
|
mock_wt.return_value = str(tmp_path)
|
||||||
|
mock_run.return_value = MagicMock(returncode=0, stdout="", stderr="")
|
||||||
|
check_tests_local("enduro-trails", "feature/ET-001-x")
|
||||||
|
args, kwargs = mock_run.call_args
|
||||||
|
cmd = args[0]
|
||||||
|
assert "make" not in cmd
|
||||||
|
assert cmd[:3] == ["python", "-m", "pytest"]
|
||||||
|
assert "../../tests/" in cmd
|
||||||
|
assert kwargs["cwd"] == os.path.join(str(tmp_path), "src", "api")
|
||||||
|
|
||||||
|
|||||||
@@ -69,6 +69,7 @@ def silence_side_effects(monkeypatch):
|
|||||||
"set_issue_needs_input",
|
"set_issue_needs_input",
|
||||||
"set_issue_in_progress",
|
"set_issue_in_progress",
|
||||||
"set_issue_blocked",
|
"set_issue_blocked",
|
||||||
|
"set_issue_done",
|
||||||
):
|
):
|
||||||
monkeypatch.setattr(stage_engine, name, MagicMock())
|
monkeypatch.setattr(stage_engine, name, MagicMock())
|
||||||
|
|
||||||
@@ -177,6 +178,40 @@ class TestHappyPathAgentSelection:
|
|||||||
assert res.enqueued_agent is None
|
assert res.enqueued_agent is None
|
||||||
assert _jobs() == []
|
assert _jobs() == []
|
||||||
|
|
||||||
|
def test_deploy_success_syncs_plane_to_terminal_done(self, monkeypatch):
|
||||||
|
"""FIX 3: a successful deploy->done forces the Plane issue to terminal Done.
|
||||||
|
|
||||||
|
Previously the task could stick on In Progress because the merge webhook
|
||||||
|
completed it out-of-band. Now the engine drives set_issue_done() on the
|
||||||
|
deploy->done success transition.
|
||||||
|
"""
|
||||||
|
monkeypatch.setattr(
|
||||||
|
stage_engine, "QG_CHECKS",
|
||||||
|
{k: _pass for k in stage_engine.QG_CHECKS},
|
||||||
|
)
|
||||||
|
task_id = _make_task("deploy", wi="ET-012")
|
||||||
|
res = advance_stage(
|
||||||
|
task_id, "deploy", "enduro-trails", "ET-012",
|
||||||
|
"feature/ET-012-x", finished_agent="deployer",
|
||||||
|
)
|
||||||
|
assert res.advanced is True
|
||||||
|
assert _stage(task_id) == "done"
|
||||||
|
# The terminal Plane sync was invoked with the work item id.
|
||||||
|
stage_engine.set_issue_done.assert_called_once_with("ET-012")
|
||||||
|
|
||||||
|
def test_non_terminal_advance_does_not_force_plane_done(self, monkeypatch):
|
||||||
|
"""set_issue_done must only fire on the terminal deploy->done transition."""
|
||||||
|
monkeypatch.setattr(
|
||||||
|
stage_engine, "QG_CHECKS",
|
||||||
|
{k: _pass for k in stage_engine.QG_CHECKS},
|
||||||
|
)
|
||||||
|
task_id = _make_task("review")
|
||||||
|
advance_stage(
|
||||||
|
task_id, "review", "enduro-trails", "ET-001",
|
||||||
|
"feature/ET-001-x", finished_agent=None,
|
||||||
|
)
|
||||||
|
stage_engine.set_issue_done.assert_not_called()
|
||||||
|
|
||||||
def test_done_is_terminal(self):
|
def test_done_is_terminal(self):
|
||||||
task_id = _make_task("done")
|
task_id = _make_task("done")
|
||||||
res = advance_stage(task_id, "done", "enduro-trails", "ET-001",
|
res = advance_stage(task_id, "done", "enduro-trails", "ET-001",
|
||||||
@@ -203,10 +238,13 @@ class TestQgFailureDoesNotAdvance:
|
|||||||
assert _jobs() == []
|
assert _jobs() == []
|
||||||
|
|
||||||
def test_webhook_path_emits_qg_failure_notification(self, monkeypatch):
|
def test_webhook_path_emits_qg_failure_notification(self, monkeypatch):
|
||||||
"""finished_agent=None -> generic QG-failure notification fires (plane parity)."""
|
"""finished_agent=None -> generic QG-failure notification fires (plane parity).
|
||||||
|
|
||||||
|
development stage QG is now check_ci_green (was check_tests_local).
|
||||||
|
"""
|
||||||
monkeypatch.setattr(
|
monkeypatch.setattr(
|
||||||
stage_engine, "QG_CHECKS",
|
stage_engine, "QG_CHECKS",
|
||||||
{**stage_engine.QG_CHECKS, "check_tests_local": _fail("ci red")},
|
{**stage_engine.QG_CHECKS, "check_ci_green": _fail("ci red")},
|
||||||
)
|
)
|
||||||
task_id = _make_task("development")
|
task_id = _make_task("development")
|
||||||
advance_stage(task_id, "development", "enduro-trails", "ET-001",
|
advance_stage(task_id, "development", "enduro-trails", "ET-001",
|
||||||
@@ -297,6 +335,59 @@ class TestTesterFail:
|
|||||||
assert _jobs() == []
|
assert _jobs() == []
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# BUG 8: deploy verdict gates deploy -> done (not the LLM exit code)
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
class TestDeployVerdict:
|
||||||
|
"""deploy -> done must be gated on check_deploy_status (the deployer's
|
||||||
|
machine-readable verdict), NOT on the LLM exit code (always 0)."""
|
||||||
|
|
||||||
|
def test_failed_verdict_rolls_back_to_development(self, monkeypatch):
|
||||||
|
# deployer finished (exit_code 0 from launcher), but verdict is FAILED.
|
||||||
|
monkeypatch.setattr(
|
||||||
|
stage_engine, "QG_CHECKS",
|
||||||
|
{**stage_engine.QG_CHECKS,
|
||||||
|
"check_deploy_status": _fail("Deploy status: FAILED")},
|
||||||
|
)
|
||||||
|
task_id = _make_task("deploy")
|
||||||
|
res = advance_stage(task_id, "deploy", "enduro-trails", "ET-011",
|
||||||
|
"feature/ET-011-x", finished_agent="deployer")
|
||||||
|
assert res.advanced is False
|
||||||
|
assert res.rolled_back_to == "development"
|
||||||
|
assert _stage(task_id) == "development" # NOT done
|
||||||
|
assert res.alerted is True
|
||||||
|
assert stage_engine.set_issue_blocked.called
|
||||||
|
assert stage_engine.send_telegram.called
|
||||||
|
|
||||||
|
def test_no_deploy_log_rolls_back(self, monkeypatch):
|
||||||
|
# No frontmatter field / no file -> check returns False -> rollback.
|
||||||
|
monkeypatch.setattr(
|
||||||
|
stage_engine, "QG_CHECKS",
|
||||||
|
{**stage_engine.QG_CHECKS,
|
||||||
|
"check_deploy_status": _fail("Deploy log not found (14-deploy-log.md)")},
|
||||||
|
)
|
||||||
|
task_id = _make_task("deploy")
|
||||||
|
res = advance_stage(task_id, "deploy", "enduro-trails", "ET-011",
|
||||||
|
"feature/ET-011-x", finished_agent="deployer")
|
||||||
|
assert res.advanced is False
|
||||||
|
assert _stage(task_id) == "development"
|
||||||
|
|
||||||
|
def test_success_verdict_advances_to_done(self, monkeypatch):
|
||||||
|
monkeypatch.setattr(
|
||||||
|
stage_engine, "QG_CHECKS",
|
||||||
|
{**stage_engine.QG_CHECKS,
|
||||||
|
"check_deploy_status": _pass},
|
||||||
|
)
|
||||||
|
task_id = _make_task("deploy")
|
||||||
|
res = advance_stage(task_id, "deploy", "enduro-trails", "ET-011",
|
||||||
|
"feature/ET-011-x", finished_agent="deployer")
|
||||||
|
assert res.advanced is True
|
||||||
|
assert res.to_stage == "done"
|
||||||
|
assert _stage(task_id) == "done"
|
||||||
|
assert res.enqueued_agent is None # no agent leaves deploy
|
||||||
|
assert _jobs() == []
|
||||||
|
|
||||||
|
|
||||||
# ---------------------------------------------------------------------------
|
# ---------------------------------------------------------------------------
|
||||||
# Architect conflict -> rollback to analysis + enqueue analyst
|
# Architect conflict -> rollback to analysis + enqueue analyst
|
||||||
# ---------------------------------------------------------------------------
|
# ---------------------------------------------------------------------------
|
||||||
@@ -358,6 +449,63 @@ class TestAnalysisApprovedFlow:
|
|||||||
assert stage_engine.notify_approve_requested.called
|
assert stage_engine.notify_approve_requested.called
|
||||||
assert _jobs() == []
|
assert _jobs() == []
|
||||||
|
|
||||||
|
def test_approved_verdict_advances_analysis_to_architecture(self, monkeypatch):
|
||||||
|
"""BUG 4: a human Approved STATUS (webhook path, finished_agent=None)
|
||||||
|
must satisfy the analysis gate and advance analysis -> architecture,
|
||||||
|
enqueuing the architect. The status-only approval must NOT re-run
|
||||||
|
check_analysis_approved (which looks for an :approved: COMMENT and would
|
||||||
|
otherwise wrongly block the advance).
|
||||||
|
"""
|
||||||
|
# Make check_analysis_approved FAIL if it is ever called: the webhook
|
||||||
|
# path must bypass it entirely (status == approval). If the engine were
|
||||||
|
# to re-run the gate, this would block the advance and fail the test.
|
||||||
|
monkeypatch.setattr(
|
||||||
|
stage_engine, "QG_CHECKS",
|
||||||
|
{
|
||||||
|
**stage_engine.QG_CHECKS,
|
||||||
|
"check_analysis_approved": _fail("no :approved: comment"),
|
||||||
|
},
|
||||||
|
)
|
||||||
|
# Guard: the approval-flow (launcher-only) must NOT be invoked here.
|
||||||
|
flow = MagicMock()
|
||||||
|
monkeypatch.setattr(stage_engine, "_handle_analysis_approved_flow", flow)
|
||||||
|
|
||||||
|
task_id = _make_task("analysis")
|
||||||
|
res = advance_stage(
|
||||||
|
task_id, "analysis", "enduro-trails", "ET-001",
|
||||||
|
"feature/ET-001-x", finished_agent=None,
|
||||||
|
)
|
||||||
|
|
||||||
|
assert res.advanced is True
|
||||||
|
assert res.to_stage == "architecture"
|
||||||
|
assert _stage(task_id) == "architecture"
|
||||||
|
assert res.enqueued_agent == "architect"
|
||||||
|
# Sanity: agent for analysis is architect, never analyst (no re-run loop).
|
||||||
|
assert get_agent_for_stage("analysis") == "architect"
|
||||||
|
jobs = _jobs()
|
||||||
|
assert len(jobs) == 1
|
||||||
|
assert jobs[0]["agent"] == "architect"
|
||||||
|
# The launcher-only approval-flow was NOT called on the webhook path.
|
||||||
|
flow.assert_not_called()
|
||||||
|
|
||||||
|
def test_launcher_path_does_not_advance_and_calls_flow(self, monkeypatch):
|
||||||
|
"""Regression: the launcher path (finished_agent='analyst') still routes
|
||||||
|
into _handle_analysis_approved_flow and does NOT advance.
|
||||||
|
"""
|
||||||
|
flow = MagicMock()
|
||||||
|
monkeypatch.setattr(stage_engine, "_handle_analysis_approved_flow", flow)
|
||||||
|
|
||||||
|
task_id = _make_task("analysis")
|
||||||
|
res = advance_stage(
|
||||||
|
task_id, "analysis", "enduro-trails", "ET-001",
|
||||||
|
"feature/ET-001-x", finished_agent="analyst",
|
||||||
|
)
|
||||||
|
|
||||||
|
assert res.advanced is not True
|
||||||
|
assert _stage(task_id) == "analysis"
|
||||||
|
assert _jobs() == []
|
||||||
|
flow.assert_called_once()
|
||||||
|
|
||||||
|
|
||||||
# ---------------------------------------------------------------------------
|
# ---------------------------------------------------------------------------
|
||||||
# launcher + plane both delegate to the engine
|
# launcher + plane both delegate to the engine
|
||||||
|
|||||||
200
tests/test_status_only_verdict.py
Normal file
200
tests/test_status_only_verdict.py
Normal file
@@ -0,0 +1,200 @@
|
|||||||
|
"""Status-only verdict model (bug 3 fix).
|
||||||
|
|
||||||
|
The comment-based control mechanism (:approved: / :rejected: / answer-to-questions)
|
||||||
|
was removed. The pipeline is driven SOLELY by Plane status changes. These tests
|
||||||
|
lock in the new behaviour:
|
||||||
|
|
||||||
|
* test_inreview_comment_does_not_revert — bug 3 root: an In Review task,
|
||||||
|
any comment arrives -> status NOT reverted, no agent launched.
|
||||||
|
* test_any_comment_no_pipeline_action — :approved: / :rejected: / plain
|
||||||
|
text comment -> no status change, no enqueue.
|
||||||
|
* test_approved_status_advances_without_inprogress_reset — Approved status
|
||||||
|
advances WITHOUT an intermediate set_issue_in_progress reset.
|
||||||
|
* test_rejected_status_pulls_reason_from_comment — Rejected status pulls the
|
||||||
|
reason from the issue's latest comment (mocked GET comments).
|
||||||
|
"""
|
||||||
|
|
||||||
|
import os
|
||||||
|
import tempfile
|
||||||
|
|
||||||
|
_test_db = os.path.join(tempfile.gettempdir(), "test_orchestrator_status_only.db")
|
||||||
|
os.environ["ORCH_DB_PATH"] = _test_db
|
||||||
|
os.environ.setdefault("ORCH_PLANE_WEBHOOK_SECRET", "")
|
||||||
|
os.environ.setdefault("ORCH_GITEA_TOKEN", "test-token")
|
||||||
|
os.environ.setdefault("ORCH_PLANE_API_TOKEN", "test-token")
|
||||||
|
|
||||||
|
import pytest # noqa: E402
|
||||||
|
from unittest.mock import patch, AsyncMock # noqa: E402
|
||||||
|
from fastapi.testclient import TestClient # noqa: E402
|
||||||
|
|
||||||
|
from src.main import app # noqa: E402
|
||||||
|
from src.db import init_db, get_db # noqa: E402
|
||||||
|
from src import projects as P # noqa: E402
|
||||||
|
from src.projects import reload_projects # noqa: E402
|
||||||
|
|
||||||
|
ENDURO_PLANE_ID = "7a79f0a9-5278-49cd-9007-9a338f238f9c"
|
||||||
|
APPROVED = "a519a341-dada-4a91-8910-7604f82b79c5"
|
||||||
|
REJECTED = "ba958f3c-5db5-461d-8f82-89425e413b97"
|
||||||
|
IN_REVIEW = "38fb1f64-aa1e-48a3-92e0-0b109679046b"
|
||||||
|
|
||||||
|
client = TestClient(app)
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture(autouse=True)
|
||||||
|
def setup(monkeypatch):
|
||||||
|
monkeypatch.setattr(P.settings, "db_path", _test_db)
|
||||||
|
import src.db as _db
|
||||||
|
monkeypatch.setattr(_db.settings, "db_path", _test_db)
|
||||||
|
if os.path.exists(_test_db):
|
||||||
|
os.unlink(_test_db)
|
||||||
|
init_db()
|
||||||
|
monkeypatch.setattr("src.webhooks.plane.verify_plane_signature", lambda body, sig: True)
|
||||||
|
registry_json = (
|
||||||
|
f'[{{"plane_project_id": "{ENDURO_PLANE_ID}", "repo": "enduro-trails",'
|
||||||
|
f' "work_item_prefix": "ET", "name": "enduro-trails"}}]'
|
||||||
|
)
|
||||||
|
monkeypatch.setattr(P.settings, "projects_json", registry_json)
|
||||||
|
reload_projects()
|
||||||
|
# Seed a task at the 'review' stage for plane_id 'r-1'.
|
||||||
|
conn = get_db()
|
||||||
|
conn.execute(
|
||||||
|
"INSERT INTO tasks (plane_id, work_item_id, repo, branch, stage, plane_issue_id) "
|
||||||
|
"VALUES (?, ?, ?, ?, ?, ?)",
|
||||||
|
("r-1", "ET-700", "enduro-trails", "feature/ET-700-x", "review", "r-1"),
|
||||||
|
)
|
||||||
|
conn.commit()
|
||||||
|
conn.close()
|
||||||
|
yield
|
||||||
|
reload_projects()
|
||||||
|
if os.path.exists(_test_db):
|
||||||
|
os.unlink(_test_db)
|
||||||
|
|
||||||
|
|
||||||
|
class _FakeResp:
|
||||||
|
def __init__(self, status_code, payload):
|
||||||
|
self.status_code = status_code
|
||||||
|
self._payload = payload
|
||||||
|
|
||||||
|
def json(self):
|
||||||
|
return self._payload
|
||||||
|
|
||||||
|
|
||||||
|
def _comment(text, plane_id="r-1"):
|
||||||
|
return client.post("/webhook/plane", json={
|
||||||
|
"event": "issue_comment", "action": "created",
|
||||||
|
"data": {"work_item_id": plane_id, "comment_stripped": text,
|
||||||
|
"project": ENDURO_PLANE_ID},
|
||||||
|
})
|
||||||
|
|
||||||
|
|
||||||
|
def _status(state_id, plane_id="r-1", old="prev"):
|
||||||
|
return client.post("/webhook/plane", json={
|
||||||
|
"event": "issue", "action": "updated",
|
||||||
|
"data": {
|
||||||
|
"id": plane_id, "name": "Status task", "project": ENDURO_PLANE_ID,
|
||||||
|
"state": {"id": state_id, "name": "X", "group": "started"},
|
||||||
|
},
|
||||||
|
"activity": {"field": "state", "new_value": state_id, "old_value": old},
|
||||||
|
})
|
||||||
|
|
||||||
|
|
||||||
|
def _stage(plane_id="r-1"):
|
||||||
|
conn = get_db()
|
||||||
|
row = conn.execute("SELECT stage FROM tasks WHERE plane_id=?", (plane_id,)).fetchone()
|
||||||
|
conn.close()
|
||||||
|
return row[0] if row else None
|
||||||
|
|
||||||
|
|
||||||
|
# --------------------------------------------------------------------------- #
|
||||||
|
# Bug 3 root: In Review must not revert on a comment.
|
||||||
|
# --------------------------------------------------------------------------- #
|
||||||
|
@patch("src.webhooks.plane.enqueue_job")
|
||||||
|
@patch("src.plane_sync.set_issue_in_progress")
|
||||||
|
@patch("src.plane_sync._set_issue_state_direct")
|
||||||
|
@patch("src.plane_sync.update_issue_state")
|
||||||
|
def test_inreview_comment_does_not_revert(
|
||||||
|
mock_update_state, mock_set_direct, mock_sip, mock_enqueue
|
||||||
|
):
|
||||||
|
"""Bug 3: task in In Review, ANY comment arrives -> status NOT reverted to
|
||||||
|
In Progress, NO agent launched. The analyst's own 'waiting for approval'
|
||||||
|
comment used to echo back and self-hit -> reverted In Review -> In Progress.
|
||||||
|
"""
|
||||||
|
# analyst's own echo comment
|
||||||
|
resp = _comment("Готово, жду approved")
|
||||||
|
assert resp.status_code == 200
|
||||||
|
# no status changes whatsoever
|
||||||
|
mock_sip.assert_not_called()
|
||||||
|
mock_set_direct.assert_not_called()
|
||||||
|
mock_update_state.assert_not_called()
|
||||||
|
# no agent launched
|
||||||
|
mock_enqueue.assert_not_called()
|
||||||
|
# stage untouched
|
||||||
|
assert _stage() == "review"
|
||||||
|
|
||||||
|
|
||||||
|
# --------------------------------------------------------------------------- #
|
||||||
|
# Any comment -> zero pipeline side-effects.
|
||||||
|
# --------------------------------------------------------------------------- #
|
||||||
|
@pytest.mark.parametrize("text", [":approved:", ":rejected: bad", "plain text", ""])
|
||||||
|
@patch("src.webhooks.plane.enqueue_job")
|
||||||
|
@patch("src.webhooks.plane._try_advance_stage", new_callable=AsyncMock)
|
||||||
|
@patch("src.webhooks.plane._rollback_stage", new_callable=AsyncMock)
|
||||||
|
@patch("src.plane_sync.set_issue_in_progress")
|
||||||
|
@patch("src.plane_sync._set_issue_state_direct")
|
||||||
|
def test_any_comment_no_pipeline_action(
|
||||||
|
mock_set_direct, mock_sip, mock_rollback, mock_advance, mock_enqueue, text
|
||||||
|
):
|
||||||
|
resp = _comment(text)
|
||||||
|
assert resp.status_code == 200
|
||||||
|
mock_advance.assert_not_called()
|
||||||
|
mock_rollback.assert_not_called()
|
||||||
|
mock_sip.assert_not_called()
|
||||||
|
mock_set_direct.assert_not_called()
|
||||||
|
mock_enqueue.assert_not_called()
|
||||||
|
assert _stage() == "review"
|
||||||
|
|
||||||
|
|
||||||
|
# --------------------------------------------------------------------------- #
|
||||||
|
# Approved status advances WITHOUT in_progress reset.
|
||||||
|
# --------------------------------------------------------------------------- #
|
||||||
|
@patch("src.plane_sync.set_issue_in_progress")
|
||||||
|
@patch("src.webhooks.plane._try_advance_stage", new_callable=AsyncMock)
|
||||||
|
def test_approved_status_advances_without_inprogress_reset(mock_advance, mock_sip):
|
||||||
|
resp = _status(APPROVED)
|
||||||
|
assert resp.status_code == 200
|
||||||
|
mock_advance.assert_awaited_once()
|
||||||
|
# work_item_id passed positionally
|
||||||
|
assert "ET-700" in mock_advance.call_args.args
|
||||||
|
# bug 3 (cause B): NO intermediate set_issue_in_progress before advance.
|
||||||
|
mock_sip.assert_not_called()
|
||||||
|
|
||||||
|
|
||||||
|
# --------------------------------------------------------------------------- #
|
||||||
|
# Rejected status pulls reason from latest comment.
|
||||||
|
# --------------------------------------------------------------------------- #
|
||||||
|
@patch("src.webhooks.plane.httpx.get")
|
||||||
|
@patch("src.webhooks.plane._rollback_stage", new_callable=AsyncMock)
|
||||||
|
def test_rejected_status_pulls_reason_from_comment(mock_rollback, mock_get):
|
||||||
|
mock_get.return_value = _FakeResp(200, {"results": [
|
||||||
|
{"comment_stripped": "old comment", "created_at": "2026-06-03T09:00:00Z"},
|
||||||
|
{"comment_html": "<p>Needs more test coverage</p>",
|
||||||
|
"created_at": "2026-06-03T11:30:00Z"},
|
||||||
|
]})
|
||||||
|
resp = _status(REJECTED)
|
||||||
|
assert resp.status_code == 200
|
||||||
|
mock_rollback.assert_awaited_once()
|
||||||
|
reason = mock_rollback.call_args.args[-1]
|
||||||
|
# latest by created_at, HTML stripped
|
||||||
|
assert "Needs more test coverage" in reason
|
||||||
|
assert "<p>" not in reason
|
||||||
|
|
||||||
|
|
||||||
|
@patch("src.webhooks.plane.httpx.get")
|
||||||
|
@patch("src.webhooks.plane._rollback_stage", new_callable=AsyncMock)
|
||||||
|
def test_rejected_status_no_comment_uses_fallback(mock_rollback, mock_get):
|
||||||
|
mock_get.return_value = _FakeResp(200, {"results": []})
|
||||||
|
resp = _status(REJECTED)
|
||||||
|
assert resp.status_code == 200
|
||||||
|
mock_rollback.assert_awaited_once()
|
||||||
|
reason = mock_rollback.call_args.args[-1]
|
||||||
|
assert "no reason comment" in reason
|
||||||
@@ -2,8 +2,9 @@
|
|||||||
|
|
||||||
* work_item.created / issue created -> NO task, NO branch, NO analyst.
|
* work_item.created / issue created -> NO task, NO branch, NO analyst.
|
||||||
* issue updated -> In Progress (from backlog) -> task created + analyst enqueued.
|
* issue updated -> In Progress (from backlog) -> task created + analyst enqueued.
|
||||||
* a second In Progress update for the same issue -> NO duplicate, NO restart
|
* a second In Progress update while the agent is busy -> NO duplicate, NO
|
||||||
(protects handle_comment, which also flips issues to In Progress).
|
restart (busy-guard).
|
||||||
|
* In Progress returned from Needs Input (agent idle) -> agent RELAUNCHED.
|
||||||
|
|
||||||
launcher / Gitea network are mocked. Real FastAPI endpoint via TestClient.
|
launcher / Gitea network are mocked. Real FastAPI endpoint via TestClient.
|
||||||
"""
|
"""
|
||||||
@@ -125,15 +126,34 @@ def test_in_progress_starts_pipeline(mock_seq, mock_branch, mock_docs, mock_enqu
|
|||||||
@patch("src.webhooks.plane._create_initial_docs", new_callable=AsyncMock)
|
@patch("src.webhooks.plane._create_initial_docs", new_callable=AsyncMock)
|
||||||
@patch("src.webhooks.plane._create_gitea_branch", new_callable=AsyncMock)
|
@patch("src.webhooks.plane._create_gitea_branch", new_callable=AsyncMock)
|
||||||
@patch("src.plane_sync.fetch_issue_sequence_id", return_value=5)
|
@patch("src.plane_sync.fetch_issue_sequence_id", return_value=5)
|
||||||
def test_repeat_in_progress_is_idempotent(mock_seq, mock_branch, mock_docs, mock_enqueue):
|
def test_repeat_in_progress_while_job_active_does_not_relaunch(
|
||||||
|
mock_seq, mock_branch, mock_docs, mock_enqueue
|
||||||
|
):
|
||||||
|
"""Status-only model busy-guard: a duplicate In Progress webhook that arrives
|
||||||
|
while the stage agent still has a queued/running job must NOT relaunch the
|
||||||
|
agent (no double launch).
|
||||||
|
"""
|
||||||
mock_enqueue.return_value = 1
|
mock_enqueue.return_value = 1
|
||||||
_to_in_progress("st-2")
|
_to_in_progress("st-2")
|
||||||
assert _count("st-2") == 1
|
assert _count("st-2") == 1
|
||||||
assert mock_enqueue.call_count == 1
|
assert mock_enqueue.call_count == 1
|
||||||
|
|
||||||
# Second In Progress update (e.g. handle_comment re-set the status). Use a
|
# enqueue_job is mocked above, so no real job row exists. Seed an ACTIVE
|
||||||
# DISTINCT body (different activity old_value) so webhook dedup does NOT
|
# (queued) job for the task so has_active_job_for_task() reports the agent as
|
||||||
# short-circuit it — this exercises the existing-task idempotency guard in
|
# busy -> the busy-guard fires.
|
||||||
|
conn = get_db()
|
||||||
|
task_id = conn.execute(
|
||||||
|
"SELECT id FROM tasks WHERE plane_id='st-2'"
|
||||||
|
).fetchone()[0]
|
||||||
|
conn.execute(
|
||||||
|
"INSERT INTO jobs (agent, repo, task_id, status) VALUES (?, ?, ?, 'queued')",
|
||||||
|
("analyst", "enduro-trails", task_id),
|
||||||
|
)
|
||||||
|
conn.commit()
|
||||||
|
conn.close()
|
||||||
|
|
||||||
|
# Second In Progress update. DISTINCT body (different activity old_value) so
|
||||||
|
# webhook dedup does NOT short-circuit it — this exercises the busy-guard in
|
||||||
# handle_status_start, not the delivery-dedup layer.
|
# handle_status_start, not the delivery-dedup layer.
|
||||||
resp = client.post("/webhook/plane", json={
|
resp = client.post("/webhook/plane", json={
|
||||||
"event": "issue", "action": "updated",
|
"event": "issue", "action": "updated",
|
||||||
@@ -147,4 +167,77 @@ def test_repeat_in_progress_is_idempotent(mock_seq, mock_branch, mock_docs, mock
|
|||||||
})
|
})
|
||||||
assert resp.status_code == 200
|
assert resp.status_code == 200
|
||||||
assert _count("st-2") == 1 # still exactly one task
|
assert _count("st-2") == 1 # still exactly one task
|
||||||
assert mock_enqueue.call_count == 1 # analyst NOT re-enqueued
|
assert mock_enqueue.call_count == 1 # analyst NOT re-enqueued (busy-guard)
|
||||||
|
|
||||||
|
|
||||||
|
@patch("src.webhooks.plane.add_comment", create=True)
|
||||||
|
@patch("src.webhooks.plane.enqueue_job")
|
||||||
|
@patch("src.webhooks.plane._create_initial_docs", new_callable=AsyncMock)
|
||||||
|
@patch("src.webhooks.plane._create_gitea_branch", new_callable=AsyncMock)
|
||||||
|
@patch("src.plane_sync.fetch_issue_sequence_id", return_value=5)
|
||||||
|
def test_inprogress_from_needs_input_relaunches_analyst(
|
||||||
|
mock_seq, mock_branch, mock_docs, mock_enqueue, mock_comment
|
||||||
|
):
|
||||||
|
"""Status-only answer-to-questions flow: an existing analysis task whose agent
|
||||||
|
is IDLE (no active job — it went to Needs Input) is returned to In Progress
|
||||||
|
-> the analyst is relaunched to read Slava's fresh comments.
|
||||||
|
|
||||||
|
+ double-webhook protection: a second In Progress while the relaunch job is
|
||||||
|
active does NOT relaunch again.
|
||||||
|
"""
|
||||||
|
mock_enqueue.return_value = 1
|
||||||
|
# First In Progress: starts the pipeline (creates task + enqueues analyst).
|
||||||
|
_to_in_progress("st-ni")
|
||||||
|
assert _count("st-ni") == 1
|
||||||
|
assert mock_enqueue.call_count == 1
|
||||||
|
|
||||||
|
# The analyst finished and asked questions -> Needs Input. In our model that
|
||||||
|
# means NO active job for the task (enqueue_job is mocked, so no job row).
|
||||||
|
conn = get_db()
|
||||||
|
task_id = conn.execute(
|
||||||
|
"SELECT id FROM tasks WHERE plane_id='st-ni'"
|
||||||
|
).fetchone()[0]
|
||||||
|
has_job = conn.execute(
|
||||||
|
"SELECT COUNT(*) FROM jobs WHERE task_id=? AND status IN ('queued','running')",
|
||||||
|
(task_id,),
|
||||||
|
).fetchone()[0]
|
||||||
|
conn.close()
|
||||||
|
assert has_job == 0 # agent idle
|
||||||
|
|
||||||
|
# Slava answers + returns the issue to In Progress (distinct body).
|
||||||
|
resp = client.post("/webhook/plane", json={
|
||||||
|
"event": "issue", "action": "updated",
|
||||||
|
"data": {
|
||||||
|
"id": "st-ni", "name": "A valid backlog item title",
|
||||||
|
"description_stripped": "A sufficiently long description for QG-0.",
|
||||||
|
"project": ENDURO_PLANE_ID,
|
||||||
|
"state": {"id": IN_PROGRESS, "name": "In Progress", "group": "started"},
|
||||||
|
},
|
||||||
|
"activity": {"field": "state", "new_value": IN_PROGRESS, "old_value": "needs-input"},
|
||||||
|
})
|
||||||
|
assert resp.status_code == 200
|
||||||
|
assert _count("st-ni") == 1 # no duplicate task
|
||||||
|
assert mock_enqueue.call_count == 2 # analyst RELAUNCHED
|
||||||
|
assert mock_enqueue.call_args.args[0] == "analyst"
|
||||||
|
|
||||||
|
# Seed an active job for the relaunch, then a SECOND In Progress webhook must
|
||||||
|
# NOT relaunch again (busy-guard against double webhooks).
|
||||||
|
conn = get_db()
|
||||||
|
conn.execute(
|
||||||
|
"INSERT INTO jobs (agent, repo, task_id, status) VALUES (?, ?, ?, 'running')",
|
||||||
|
("analyst", "enduro-trails", task_id),
|
||||||
|
)
|
||||||
|
conn.commit()
|
||||||
|
conn.close()
|
||||||
|
resp2 = client.post("/webhook/plane", json={
|
||||||
|
"event": "issue", "action": "updated",
|
||||||
|
"data": {
|
||||||
|
"id": "st-ni", "name": "A valid backlog item title",
|
||||||
|
"description_stripped": "A sufficiently long description for QG-0.",
|
||||||
|
"project": ENDURO_PLANE_ID,
|
||||||
|
"state": {"id": IN_PROGRESS, "name": "In Progress", "group": "started"},
|
||||||
|
},
|
||||||
|
"activity": {"field": "state", "new_value": IN_PROGRESS, "old_value": "x-y-z"},
|
||||||
|
})
|
||||||
|
assert resp2.status_code == 200
|
||||||
|
assert mock_enqueue.call_count == 2 # still 2 — busy-guard held
|
||||||
|
|||||||
138
tests/test_taskmd_description.py
Normal file
138
tests/test_taskmd_description.py
Normal file
@@ -0,0 +1,138 @@
|
|||||||
|
"""Tests for fix/taskmd-description (3 bugs at the analyst pipeline entry/exit):
|
||||||
|
|
||||||
|
BUG A: start_pipeline built the analyst .task.md WITHOUT the description body
|
||||||
|
(only Title), so analyst received a ~101-byte file and reported the
|
||||||
|
"business request is empty". task_desc must now carry the description.
|
||||||
|
|
||||||
|
BUG B: issue.updated ships only changed fields, so `name` is usually absent ->
|
||||||
|
slug/branch became "untitled". start_pipeline must pull the real name
|
||||||
|
from the Plane API (single fetch_issue_fields GET, above the slug build)
|
||||||
|
so the branch slug is NOT "untitled".
|
||||||
|
|
||||||
|
BUG C: the analyst "artifacts ready" comment used the obsolete ":approved:"
|
||||||
|
wording. Under the status-only model it must ask for the **Approved**
|
||||||
|
status (not ":approved:", not "In Progress") and link the docs that
|
||||||
|
actually exist.
|
||||||
|
"""
|
||||||
|
|
||||||
|
import os
|
||||||
|
import tempfile
|
||||||
|
|
||||||
|
_test_db = os.path.join(tempfile.gettempdir(), "test_orchestrator_taskmd_desc.db")
|
||||||
|
os.environ["ORCH_DB_PATH"] = _test_db
|
||||||
|
os.environ.setdefault("ORCH_PLANE_WEBHOOK_SECRET", "")
|
||||||
|
os.environ.setdefault("ORCH_GITEA_TOKEN", "test-token")
|
||||||
|
os.environ.setdefault("ORCH_PLANE_API_TOKEN", "test-token")
|
||||||
|
|
||||||
|
import pytest # noqa: E402
|
||||||
|
from unittest.mock import patch, AsyncMock # noqa: E402
|
||||||
|
from fastapi.testclient import TestClient # noqa: E402
|
||||||
|
|
||||||
|
from src.main import app # noqa: E402
|
||||||
|
from src.db import init_db, get_db # noqa: E402
|
||||||
|
from src import projects as P # noqa: E402
|
||||||
|
from src.projects import reload_projects # noqa: E402
|
||||||
|
|
||||||
|
ENDURO_PLANE_ID = "7a79f0a9-5278-49cd-9007-9a338f238f9c"
|
||||||
|
IN_PROGRESS = "b873d9eb-993c-48cd-97ac-99a9b1623967"
|
||||||
|
BACKLOG = "113b24f6-cce8-4be9-9a22-a359b9cf0122"
|
||||||
|
|
||||||
|
client = TestClient(app)
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture(autouse=True)
|
||||||
|
def setup(monkeypatch):
|
||||||
|
monkeypatch.setattr(P.settings, "db_path", _test_db)
|
||||||
|
import src.db as _db
|
||||||
|
monkeypatch.setattr(_db.settings, "db_path", _test_db)
|
||||||
|
if os.path.exists(_test_db):
|
||||||
|
os.unlink(_test_db)
|
||||||
|
init_db()
|
||||||
|
monkeypatch.setattr("src.webhooks.plane.verify_plane_signature", lambda body, sig: True)
|
||||||
|
registry_json = (
|
||||||
|
f'[{{"plane_project_id": "{ENDURO_PLANE_ID}", "repo": "enduro-trails",'
|
||||||
|
f' "work_item_prefix": "ET", "name": "enduro-trails"}}]'
|
||||||
|
)
|
||||||
|
monkeypatch.setattr(P.settings, "projects_json", registry_json)
|
||||||
|
reload_projects()
|
||||||
|
yield
|
||||||
|
reload_projects()
|
||||||
|
if os.path.exists(_test_db):
|
||||||
|
os.unlink(_test_db)
|
||||||
|
|
||||||
|
|
||||||
|
def _task(plane_id):
|
||||||
|
conn = get_db()
|
||||||
|
row = conn.execute("SELECT * FROM tasks WHERE plane_id=?", (plane_id,)).fetchone()
|
||||||
|
conn.close()
|
||||||
|
return row
|
||||||
|
|
||||||
|
|
||||||
|
# --------------------------------------------------------------------------- #
|
||||||
|
# BUG A: description reaches the analyst .task.md
|
||||||
|
# --------------------------------------------------------------------------- #
|
||||||
|
@patch("src.webhooks.plane.enqueue_job", return_value=1)
|
||||||
|
@patch("src.webhooks.plane._create_initial_docs", new_callable=AsyncMock)
|
||||||
|
@patch("src.webhooks.plane._create_gitea_branch", new_callable=AsyncMock)
|
||||||
|
@patch("src.plane_sync.fetch_issue_sequence_id", return_value=11)
|
||||||
|
@patch("src.plane_sync.fetch_issue_fields",
|
||||||
|
return_value=("ET-011 real title",
|
||||||
|
"REAL BUSINESS REQUEST BODY: user wants GPX upload with "
|
||||||
|
"validation and a results map."))
|
||||||
|
def test_taskdesc_includes_description(
|
||||||
|
mock_fields, mock_seq, mock_branch, mock_docs, mock_enqueue
|
||||||
|
):
|
||||||
|
resp = client.post("/webhook/plane", json={
|
||||||
|
"event": "issue", "action": "updated",
|
||||||
|
"data": {
|
||||||
|
"id": "taskA",
|
||||||
|
# status change payload: NO name, NO description (only changed field)
|
||||||
|
"project": ENDURO_PLANE_ID,
|
||||||
|
"state": {"id": IN_PROGRESS, "name": "In Progress", "group": "started"},
|
||||||
|
},
|
||||||
|
"activity": {"field": "state", "new_value": IN_PROGRESS, "old_value": BACKLOG},
|
||||||
|
})
|
||||||
|
assert resp.status_code == 200
|
||||||
|
mock_enqueue.assert_called_once()
|
||||||
|
# task_desc is the 3rd positional arg of enqueue_job(agent, repo, task_desc, ...)
|
||||||
|
task_desc = mock_enqueue.call_args.args[2]
|
||||||
|
assert "Description:" in task_desc
|
||||||
|
# the actual description body (not just the Title) is in the file
|
||||||
|
assert "REAL BUSINESS REQUEST BODY" in task_desc
|
||||||
|
assert "results map" in task_desc
|
||||||
|
|
||||||
|
|
||||||
|
# --------------------------------------------------------------------------- #
|
||||||
|
# BUG B: name fetched from Plane API when payload is empty -> slug not untitled
|
||||||
|
# --------------------------------------------------------------------------- #
|
||||||
|
@patch("src.webhooks.plane.enqueue_job", return_value=1)
|
||||||
|
@patch("src.webhooks.plane._create_initial_docs", new_callable=AsyncMock)
|
||||||
|
@patch("src.webhooks.plane._create_gitea_branch", new_callable=AsyncMock)
|
||||||
|
@patch("src.plane_sync.fetch_issue_sequence_id", return_value=11)
|
||||||
|
@patch("src.plane_sync.fetch_issue_fields",
|
||||||
|
return_value=("GPX upload feature",
|
||||||
|
"A sufficiently long description so QG-0 passes cleanly."))
|
||||||
|
def test_name_fetched_when_payload_empty(
|
||||||
|
mock_fields, mock_seq, mock_branch, mock_docs, mock_enqueue
|
||||||
|
):
|
||||||
|
resp = client.post("/webhook/plane", json={
|
||||||
|
"event": "issue", "action": "updated",
|
||||||
|
"data": {
|
||||||
|
"id": "taskB",
|
||||||
|
# NO name, NO description in the payload (Plane status-change shape)
|
||||||
|
"project": ENDURO_PLANE_ID,
|
||||||
|
"state": {"id": IN_PROGRESS, "name": "In Progress", "group": "started"},
|
||||||
|
},
|
||||||
|
"activity": {"field": "state", "new_value": IN_PROGRESS, "old_value": BACKLOG},
|
||||||
|
})
|
||||||
|
assert resp.status_code == 200
|
||||||
|
mock_fields.assert_called_once()
|
||||||
|
row = _task("taskB")
|
||||||
|
assert row is not None
|
||||||
|
branch = row["branch"]
|
||||||
|
# slug derived from the fetched name -> "gpx-upload-feature", NOT untitled
|
||||||
|
assert "untitled" not in branch
|
||||||
|
assert "gpx-upload-feature" in branch
|
||||||
|
# Title in the analyst task file is the fetched name, not "untitled"
|
||||||
|
task_desc = mock_enqueue.call_args.args[2]
|
||||||
|
assert "Title: GPX upload feature" in task_desc
|
||||||
518
tests/test_telegram_tracker.py
Normal file
518
tests/test_telegram_tracker.py
Normal file
@@ -0,0 +1,518 @@
|
|||||||
|
"""feat/telegram-live-tracker: tests for the live Telegram task tracker.
|
||||||
|
|
||||||
|
Covers (per DEV_TASK_TELEGRAM_TRACKER.md):
|
||||||
|
* short_model_name: provider/claude- prefix trimming.
|
||||||
|
* render_task_tracker: per-stage line format (in↓/out↑, model, cost, minutes),
|
||||||
|
the "⏸️ Ревью БРД · твоё время" line, the 💰 totals, and the finish block
|
||||||
|
(⏱️ three times + 🔗/📦).
|
||||||
|
* first message -> sendMessage stores message_id; transition -> editMessageText.
|
||||||
|
* fallback: editMessageText fails -> a NEW message is sent and the id updated.
|
||||||
|
* which alerts go out SEPARATELY (approve-gate / deploy-fail / agent-fail /
|
||||||
|
error) vs which do NOT (QG-pending / agent-start / stage-transition).
|
||||||
|
|
||||||
|
Isolated temp DB; no network (httpx is patched).
|
||||||
|
"""
|
||||||
|
|
||||||
|
import os
|
||||||
|
import tempfile
|
||||||
|
|
||||||
|
os.environ.setdefault("ORCH_PLANE_API_TOKEN", "test-token")
|
||||||
|
os.environ.setdefault("ORCH_GITEA_TOKEN", "test-token")
|
||||||
|
|
||||||
|
_test_db = os.path.join(tempfile.gettempdir(), "test_orchestrator_tracker.db")
|
||||||
|
os.environ["ORCH_DB_PATH"] = _test_db
|
||||||
|
|
||||||
|
from unittest.mock import MagicMock, patch # noqa: E402
|
||||||
|
|
||||||
|
import pytest # noqa: E402
|
||||||
|
|
||||||
|
import src.db as db_module # noqa: E402
|
||||||
|
from src.db import init_db, get_db # noqa: E402
|
||||||
|
from src import notifications as N # noqa: E402
|
||||||
|
from src import usage as U # noqa: E402
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture(autouse=True)
|
||||||
|
def setup_db(monkeypatch):
|
||||||
|
monkeypatch.setattr(db_module.settings, "db_path", _test_db, raising=False)
|
||||||
|
if os.path.exists(_test_db):
|
||||||
|
os.unlink(_test_db)
|
||||||
|
init_db()
|
||||||
|
# Re-enable send_telegram (conftest stubs it to a no-op); these tests patch
|
||||||
|
# httpx / the lower-level helpers explicitly per case.
|
||||||
|
yield
|
||||||
|
if os.path.exists(_test_db):
|
||||||
|
os.unlink(_test_db)
|
||||||
|
|
||||||
|
|
||||||
|
# --------------------------------------------------------------------------- #
|
||||||
|
# helpers to build a task + runs in the DB
|
||||||
|
# --------------------------------------------------------------------------- #
|
||||||
|
def _mk_task(stage="development", title="\u0422\u0440\u0435\u043a\u0438 \u0441 \u0437\u0443\u043c\u0430 z5",
|
||||||
|
wid="ET-012", brd_start=None, brd_end=None):
|
||||||
|
conn = get_db()
|
||||||
|
cur = conn.execute(
|
||||||
|
"INSERT INTO tasks (plane_id, work_item_id, repo, branch, stage, title, "
|
||||||
|
"brd_review_started_at, brd_review_ended_at) "
|
||||||
|
"VALUES (?, ?, ?, ?, ?, ?, ?, ?)",
|
||||||
|
("p1", wid, "enduro-trails", "feature/ET-012-x", stage, title,
|
||||||
|
brd_start, brd_end),
|
||||||
|
)
|
||||||
|
tid = cur.lastrowid
|
||||||
|
conn.commit()
|
||||||
|
conn.close()
|
||||||
|
return tid
|
||||||
|
|
||||||
|
|
||||||
|
def _mk_run(task_id, agent, started, finished, in_tok, out_tok,
|
||||||
|
cache_read=0, cache_creation=0, cost=0.0, model=None, exit_code=0):
|
||||||
|
conn = get_db()
|
||||||
|
cur = conn.execute(
|
||||||
|
"INSERT INTO agent_runs (task_id, agent, started_at, finished_at, "
|
||||||
|
"exit_code, input_tokens, output_tokens, cache_read_tokens, "
|
||||||
|
"cache_creation_tokens, cost_usd, model) "
|
||||||
|
"VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
|
||||||
|
(task_id, agent, started, finished, exit_code, in_tok, out_tok,
|
||||||
|
cache_read, cache_creation, cost, model),
|
||||||
|
)
|
||||||
|
rid = cur.lastrowid
|
||||||
|
conn.commit()
|
||||||
|
conn.close()
|
||||||
|
return rid
|
||||||
|
|
||||||
|
|
||||||
|
# --------------------------------------------------------------------------- #
|
||||||
|
# short_model_name
|
||||||
|
# --------------------------------------------------------------------------- #
|
||||||
|
def test_short_model_name():
|
||||||
|
assert U.short_model_name("tokenator/claude-opus-4-8") == "opus-4-8"
|
||||||
|
assert U.short_model_name("vibecode/claude-sonnet-4.6") == "sonnet-4.6"
|
||||||
|
assert U.short_model_name("claude-opus-4-8") == "opus-4-8"
|
||||||
|
assert U.short_model_name("opus-4-8") == "opus-4-8"
|
||||||
|
assert U.short_model_name(None) == ""
|
||||||
|
assert U.short_model_name("") == ""
|
||||||
|
|
||||||
|
|
||||||
|
def test_parse_usage_extracts_model_from_modelusage():
|
||||||
|
blob = (
|
||||||
|
'{"total_cost_usd":0.01,'
|
||||||
|
'"usage":{"input_tokens":10,"output_tokens":5},'
|
||||||
|
'"modelUsage":{"claude-opus-4-8":{"inputTokens":10,"outputTokens":5}}}'
|
||||||
|
)
|
||||||
|
u = U.parse_usage_from_text(blob)
|
||||||
|
assert u["model"] == "claude-opus-4-8"
|
||||||
|
|
||||||
|
|
||||||
|
# --------------------------------------------------------------------------- #
|
||||||
|
# render_task_tracker
|
||||||
|
# --------------------------------------------------------------------------- #
|
||||||
|
def test_render_in_progress_stage_lines_and_totals():
|
||||||
|
tid = _mk_task(stage="deploy", brd_start="2026-06-04 10:00:00",
|
||||||
|
brd_end="2026-06-04 10:08:00")
|
||||||
|
# Analysis: 10м, 1.1M in (mostly cache) / 39.6k out, $2.38, opus-4-8
|
||||||
|
_mk_run(tid, "analyst", "2026-06-04 09:00:00", "2026-06-04 09:10:00",
|
||||||
|
in_tok=1000, out_tok=39600, cache_read=1_100_000, cost=2.38,
|
||||||
|
model="tokenator/claude-opus-4-8")
|
||||||
|
_mk_run(tid, "architect", "2026-06-04 10:08:00", "2026-06-04 10:17:00",
|
||||||
|
in_tok=500, out_tok=34400, cache_read=1_500_000, cost=2.24,
|
||||||
|
model="tokenator/claude-opus-4-8")
|
||||||
|
_mk_run(tid, "developer", "2026-06-04 10:17:00", "2026-06-04 10:28:00",
|
||||||
|
in_tok=400, out_tok=45800, cache_read=8_400_000, cost=7.29,
|
||||||
|
model="tokenator/claude-opus-4-8")
|
||||||
|
_mk_run(tid, "reviewer", "2026-06-04 10:28:00", "2026-06-04 10:31:00",
|
||||||
|
in_tok=300, out_tok=12900, cache_read=1_200_000, cost=1.53,
|
||||||
|
model="vibecode/claude-sonnet-4.6")
|
||||||
|
_mk_run(tid, "tester", "2026-06-04 10:31:00", "2026-06-04 10:36:00",
|
||||||
|
in_tok=200, out_tok=19500, cache_read=1_200_000, cost=1.51,
|
||||||
|
model="vibecode/claude-sonnet-4.6")
|
||||||
|
# deployer started but not finished -> active "идёт" line.
|
||||||
|
_mk_run(tid, "deployer", "2026-06-04 10:36:00", None,
|
||||||
|
in_tok=0, out_tok=0, model=None, exit_code=None)
|
||||||
|
|
||||||
|
text = N.render_task_tracker(tid)
|
||||||
|
|
||||||
|
# Header in-progress
|
||||||
|
assert text.startswith("\U0001f6e0\ufe0f ET-012 \u00b7 \u0422\u0440\u0435\u043a\u0438")
|
||||||
|
# Per-stage format: in↓/out↑ · cost · model
|
||||||
|
assert "\u2705 Analysis" in text
|
||||||
|
assert "10\u043c" in text # analysis duration
|
||||||
|
assert "39.6k\u2191" in text # analysis out
|
||||||
|
assert "$2.38" in text
|
||||||
|
assert "opus-4-8" in text
|
||||||
|
assert "sonnet-4.6" in text # reviewer/tester model
|
||||||
|
# BRD review line (human time, ended)
|
||||||
|
assert "\u0420\u0435\u0432\u044c\u044e \u0411\u0420\u0414" in text
|
||||||
|
assert "\u0442\u0432\u043e\u0451 \u0432\u0440\u0435\u043c\u044f" in text
|
||||||
|
# Active stage
|
||||||
|
assert "\U0001f504 Deploy" in text
|
||||||
|
assert "\u0438\u0434\u0451\u0442" in text
|
||||||
|
# Totals line present with 💰
|
||||||
|
assert "\U0001f4b0" in text
|
||||||
|
# In-progress: no final ⏱️ line
|
||||||
|
assert "\u0412\u0441\u0435\u0433\u043e" not in text
|
||||||
|
|
||||||
|
|
||||||
|
def test_render_brd_review_waiting_shows_hourglass():
|
||||||
|
tid = _mk_task(stage="analysis", brd_start="2026-06-04 10:00:00",
|
||||||
|
brd_end=None)
|
||||||
|
_mk_run(tid, "analyst", "2026-06-04 09:00:00", "2026-06-04 09:10:00",
|
||||||
|
in_tok=1000, out_tok=39600, cache_read=1_100_000, cost=2.38,
|
||||||
|
model="tokenator/claude-opus-4-8")
|
||||||
|
text = N.render_task_tracker(tid)
|
||||||
|
assert "\u0420\u0435\u0432\u044c\u044e \u0411\u0420\u0414" in text
|
||||||
|
assert "\u23f3" in text # hourglass while waiting
|
||||||
|
|
||||||
|
|
||||||
|
def test_render_done_has_times_and_links():
|
||||||
|
tid = _mk_task(stage="done", brd_start="2026-06-04 10:00:00",
|
||||||
|
brd_end="2026-06-04 10:08:00")
|
||||||
|
# set created/updated to compute wall clock
|
||||||
|
conn = get_db()
|
||||||
|
conn.execute(
|
||||||
|
"UPDATE tasks SET created_at='2026-06-04 09:00:00', "
|
||||||
|
"updated_at='2026-06-04 09:56:00' WHERE id=?", (tid,))
|
||||||
|
conn.commit()
|
||||||
|
conn.close()
|
||||||
|
_mk_run(tid, "analyst", "2026-06-04 09:00:00", "2026-06-04 09:10:00",
|
||||||
|
in_tok=1000, out_tok=39600, cache_read=1_100_000, cost=2.38,
|
||||||
|
model="tokenator/claude-opus-4-8")
|
||||||
|
_mk_run(tid, "deployer", "2026-06-04 09:50:00", "2026-06-04 09:56:00",
|
||||||
|
in_tok=400, out_tok=22400, cache_read=1_600_000, cost=1.73,
|
||||||
|
model="tokenator/claude-opus-4-8")
|
||||||
|
|
||||||
|
with patch("src.notifications.httpx") as _hx:
|
||||||
|
# No PR found -> just "📦 deployed"
|
||||||
|
_resp = MagicMock(status_code=200)
|
||||||
|
_resp.json.return_value = []
|
||||||
|
_hx.get.return_value = _resp
|
||||||
|
text = N.render_task_tracker(tid)
|
||||||
|
|
||||||
|
assert text.startswith("\U0001f389 ET-012")
|
||||||
|
assert "\u0413\u041e\u0422\u041e\u0412\u041e" in text
|
||||||
|
# ⏱️ with three times
|
||||||
|
assert "\u23f1\ufe0f" in text
|
||||||
|
assert "\u0412\u0441\u0435\u0433\u043e" in text
|
||||||
|
assert "\u0430\u0433\u0435\u043d\u0442\u044b" in text
|
||||||
|
assert "\u0442\u0432\u043e\u0451" in text
|
||||||
|
# 📦 deployed line
|
||||||
|
assert "\U0001f4e6" in text
|
||||||
|
|
||||||
|
|
||||||
|
def test_render_escapes_html_in_title():
|
||||||
|
tid = _mk_task(stage="analysis", title="A <b>& B</b>")
|
||||||
|
_mk_run(tid, "analyst", "2026-06-04 09:00:00", "2026-06-04 09:10:00",
|
||||||
|
in_tok=10, out_tok=5, cost=0.0)
|
||||||
|
text = N.render_task_tracker(tid)
|
||||||
|
assert "<b>" in text
|
||||||
|
assert "&" in text
|
||||||
|
|
||||||
|
|
||||||
|
def test_render_omits_model_when_unknown():
|
||||||
|
tid = _mk_task(stage="analysis")
|
||||||
|
_mk_run(tid, "analyst", "2026-06-04 09:00:00", "2026-06-04 09:10:00",
|
||||||
|
in_tok=10, out_tok=5, cost=0.0, model=None)
|
||||||
|
text = N.render_task_tracker(tid)
|
||||||
|
# No trailing " · <model>" — line ends at cost.
|
||||||
|
line = [l for l in text.splitlines() if l.startswith("\u2705 Analysis")][0]
|
||||||
|
assert line.rstrip().endswith("$0.00")
|
||||||
|
|
||||||
|
|
||||||
|
# --------------------------------------------------------------------------- #
|
||||||
|
# tracker send / edit / fallback
|
||||||
|
# --------------------------------------------------------------------------- #
|
||||||
|
def test_first_call_sends_message_and_stores_id(monkeypatch):
|
||||||
|
tid = _mk_task(stage="analysis")
|
||||||
|
_mk_run(tid, "analyst", "2026-06-04 09:00:00", None, in_tok=0, out_tok=0,
|
||||||
|
exit_code=None)
|
||||||
|
|
||||||
|
sent = {}
|
||||||
|
def _fake_send(text, disable_notification=False):
|
||||||
|
sent["text"] = text
|
||||||
|
sent["silent"] = disable_notification
|
||||||
|
return 555
|
||||||
|
monkeypatch.setattr(N, "send_telegram", _fake_send)
|
||||||
|
monkeypatch.setattr(N, "edit_telegram", lambda *a, **k: (_ for _ in ()).throw(AssertionError("should not edit on first call")))
|
||||||
|
|
||||||
|
N.update_task_tracker(tid)
|
||||||
|
|
||||||
|
from src.db import get_tracker_message_id
|
||||||
|
assert get_tracker_message_id(tid) == 555
|
||||||
|
assert sent["silent"] is True # tracker is silent
|
||||||
|
|
||||||
|
|
||||||
|
def test_second_call_edits_existing_message(monkeypatch):
|
||||||
|
tid = _mk_task(stage="development")
|
||||||
|
_mk_run(tid, "analyst", "2026-06-04 09:00:00", "2026-06-04 09:10:00",
|
||||||
|
in_tok=10, out_tok=5, cost=0.1)
|
||||||
|
from src.db import set_tracker_message_id
|
||||||
|
set_tracker_message_id(tid, 777)
|
||||||
|
|
||||||
|
edited = {}
|
||||||
|
monkeypatch.setattr(N, "edit_telegram",
|
||||||
|
lambda mid, text: edited.update(mid=mid) or N.EDIT_OK)
|
||||||
|
monkeypatch.setattr(N, "send_telegram",
|
||||||
|
lambda *a, **k: (_ for _ in ()).throw(AssertionError("should not send when edit succeeds")))
|
||||||
|
|
||||||
|
N.update_task_tracker(tid)
|
||||||
|
assert edited["mid"] == 777
|
||||||
|
|
||||||
|
|
||||||
|
def test_fallback_to_new_message_when_edit_gone(monkeypatch):
|
||||||
|
"""edit returns 'gone' (message deleted/too old) -> send NEW + update id."""
|
||||||
|
tid = _mk_task(stage="development")
|
||||||
|
_mk_run(tid, "analyst", "2026-06-04 09:00:00", "2026-06-04 09:10:00",
|
||||||
|
in_tok=10, out_tok=5, cost=0.1)
|
||||||
|
from src.db import set_tracker_message_id, get_tracker_message_id
|
||||||
|
set_tracker_message_id(tid, 100)
|
||||||
|
|
||||||
|
monkeypatch.setattr(N, "edit_telegram", lambda mid, text: N.EDIT_GONE)
|
||||||
|
monkeypatch.setattr(N, "send_telegram", lambda text, disable_notification=False: 200)
|
||||||
|
|
||||||
|
N.update_task_tracker(tid)
|
||||||
|
assert get_tracker_message_id(tid) == 200 # id updated to the new message
|
||||||
|
|
||||||
|
|
||||||
|
def test_not_modified_does_not_send_new_message(monkeypatch):
|
||||||
|
"""edit returns 'not_modified' -> NO new message, id unchanged (no dupe)."""
|
||||||
|
tid = _mk_task(stage="development")
|
||||||
|
_mk_run(tid, "analyst", "2026-06-04 09:00:00", "2026-06-04 09:10:00",
|
||||||
|
in_tok=10, out_tok=5, cost=0.1)
|
||||||
|
from src.db import set_tracker_message_id, get_tracker_message_id
|
||||||
|
set_tracker_message_id(tid, 100)
|
||||||
|
|
||||||
|
monkeypatch.setattr(N, "edit_telegram", lambda mid, text: N.EDIT_NOT_MODIFIED)
|
||||||
|
monkeypatch.setattr(N, "send_telegram",
|
||||||
|
lambda *a, **k: (_ for _ in ()).throw(AssertionError("must not send on not_modified")))
|
||||||
|
|
||||||
|
N.update_task_tracker(tid)
|
||||||
|
assert get_tracker_message_id(tid) == 100 # unchanged, no duplicate
|
||||||
|
|
||||||
|
|
||||||
|
def test_transient_edit_failure_does_not_send_new_message(monkeypatch):
|
||||||
|
"""edit returns 'failed' (network/timeout/5xx) -> NO new message (no dupe)."""
|
||||||
|
tid = _mk_task(stage="development")
|
||||||
|
_mk_run(tid, "analyst", "2026-06-04 09:00:00", "2026-06-04 09:10:00",
|
||||||
|
in_tok=10, out_tok=5, cost=0.1)
|
||||||
|
from src.db import set_tracker_message_id, get_tracker_message_id
|
||||||
|
set_tracker_message_id(tid, 100)
|
||||||
|
|
||||||
|
monkeypatch.setattr(N, "edit_telegram", lambda mid, text: N.EDIT_FAILED)
|
||||||
|
monkeypatch.setattr(N, "send_telegram",
|
||||||
|
lambda *a, **k: (_ for _ in ()).throw(AssertionError("must not send on transient failure")))
|
||||||
|
|
||||||
|
N.update_task_tracker(tid)
|
||||||
|
assert get_tracker_message_id(tid) == 100 # unchanged, no duplicate
|
||||||
|
|
||||||
|
|
||||||
|
# --------------------------------------------------------------------------- #
|
||||||
|
# edit_telegram outcome classification (httpx mocked)
|
||||||
|
# --------------------------------------------------------------------------- #
|
||||||
|
def _edit_resp(ok, description=None):
|
||||||
|
resp = MagicMock()
|
||||||
|
body = {"ok": ok}
|
||||||
|
if description is not None:
|
||||||
|
body["description"] = description
|
||||||
|
resp.json.return_value = body
|
||||||
|
return resp
|
||||||
|
|
||||||
|
|
||||||
|
def _patch_tg_creds(monkeypatch):
|
||||||
|
monkeypatch.setattr(N._get_settings(), "telegram_bot_token", "T", raising=False)
|
||||||
|
monkeypatch.setattr(N._get_settings(), "telegram_chat_id", "C", raising=False)
|
||||||
|
|
||||||
|
|
||||||
|
def test_edit_telegram_ok(monkeypatch):
|
||||||
|
_patch_tg_creds(monkeypatch)
|
||||||
|
with patch("src.notifications.httpx") as hx:
|
||||||
|
hx.post.return_value = _edit_resp(True)
|
||||||
|
assert N.edit_telegram(1, "x") == N.EDIT_OK
|
||||||
|
|
||||||
|
|
||||||
|
def test_edit_telegram_not_modified_is_success(monkeypatch):
|
||||||
|
# 400 "message is not modified" -> success, not gone, no duplicate
|
||||||
|
_patch_tg_creds(monkeypatch)
|
||||||
|
with patch("src.notifications.httpx") as hx:
|
||||||
|
hx.post.return_value = _edit_resp(
|
||||||
|
False, "Bad Request: message is not modified: ...")
|
||||||
|
assert N.edit_telegram(1, "x") == N.EDIT_NOT_MODIFIED
|
||||||
|
|
||||||
|
|
||||||
|
def test_edit_telegram_exactly_the_same_is_not_modified(monkeypatch):
|
||||||
|
_patch_tg_creds(monkeypatch)
|
||||||
|
with patch("src.notifications.httpx") as hx:
|
||||||
|
hx.post.return_value = _edit_resp(
|
||||||
|
False, "Bad Request: specified new message content and reply markup "
|
||||||
|
"are exactly the same")
|
||||||
|
assert N.edit_telegram(1, "x") == N.EDIT_NOT_MODIFIED
|
||||||
|
|
||||||
|
|
||||||
|
def test_edit_telegram_message_not_found_is_gone(monkeypatch):
|
||||||
|
_patch_tg_creds(monkeypatch)
|
||||||
|
with patch("src.notifications.httpx") as hx:
|
||||||
|
hx.post.return_value = _edit_resp(
|
||||||
|
False, "Bad Request: message to edit not found")
|
||||||
|
assert N.edit_telegram(1, "x") == N.EDIT_GONE
|
||||||
|
|
||||||
|
|
||||||
|
def test_edit_telegram_cant_be_edited_is_gone(monkeypatch):
|
||||||
|
_patch_tg_creds(monkeypatch)
|
||||||
|
with patch("src.notifications.httpx") as hx:
|
||||||
|
hx.post.return_value = _edit_resp(
|
||||||
|
False, "Bad Request: message can't be edited")
|
||||||
|
assert N.edit_telegram(1, "x") == N.EDIT_GONE
|
||||||
|
|
||||||
|
|
||||||
|
def test_edit_telegram_unknown_400_is_failed(monkeypatch):
|
||||||
|
# unknown 400 -> failed (NOT gone) -> caller won't duplicate
|
||||||
|
_patch_tg_creds(monkeypatch)
|
||||||
|
with patch("src.notifications.httpx") as hx:
|
||||||
|
hx.post.return_value = _edit_resp(
|
||||||
|
False, "Bad Request: some other unexpected error")
|
||||||
|
assert N.edit_telegram(1, "x") == N.EDIT_FAILED
|
||||||
|
|
||||||
|
|
||||||
|
def test_edit_telegram_timeout_is_failed(monkeypatch):
|
||||||
|
_patch_tg_creds(monkeypatch)
|
||||||
|
with patch("src.notifications.httpx") as hx:
|
||||||
|
hx.post.side_effect = Exception("read timeout")
|
||||||
|
assert N.edit_telegram(1, "x") == N.EDIT_FAILED
|
||||||
|
|
||||||
|
|
||||||
|
def test_edit_telegram_5xx_is_failed(monkeypatch):
|
||||||
|
# Telegram 5xx still returns ok:false w/o gone/not_modified markers
|
||||||
|
_patch_tg_creds(monkeypatch)
|
||||||
|
with patch("src.notifications.httpx") as hx:
|
||||||
|
hx.post.return_value = _edit_resp(False, "Internal Server Error")
|
||||||
|
assert N.edit_telegram(1, "x") == N.EDIT_FAILED
|
||||||
|
|
||||||
|
|
||||||
|
# --------------------------------------------------------------------------- #
|
||||||
|
# render: repeated stage attempt shows "попытка N"
|
||||||
|
# --------------------------------------------------------------------------- #
|
||||||
|
_POPYTKA = "\u043f\u043e\u043f\u044b\u0442\u043a\u0430" # popytka
|
||||||
|
|
||||||
|
|
||||||
|
def test_render_active_stage_shows_attempt_on_second_run():
|
||||||
|
# Two reviewer runs while in review -> active line shows attempt 2.
|
||||||
|
tid = _mk_task(stage="review")
|
||||||
|
_mk_run(tid, "analyst", "2026-06-04 09:00:00", "2026-06-04 09:10:00",
|
||||||
|
in_tok=10, out_tok=5, cost=0.1, model="tokenator/claude-opus-4-8")
|
||||||
|
_mk_run(tid, "developer", "2026-06-04 09:10:00", "2026-06-04 09:20:00",
|
||||||
|
in_tok=10, out_tok=5, cost=0.1, model="tokenator/claude-opus-4-8")
|
||||||
|
# First review run finished (sent back to dev), second review run active.
|
||||||
|
_mk_run(tid, "reviewer", "2026-06-04 09:20:00", "2026-06-04 09:25:00",
|
||||||
|
in_tok=10, out_tok=5, cost=0.1, model="vibecode/claude-sonnet-4.6",
|
||||||
|
exit_code=0)
|
||||||
|
_mk_run(tid, "reviewer", "2026-06-04 09:30:00", None,
|
||||||
|
in_tok=0, out_tok=0, exit_code=None)
|
||||||
|
|
||||||
|
text = N.render_task_tracker(tid)
|
||||||
|
active = [l for l in text.splitlines()
|
||||||
|
if l.startswith("\U0001f504") and "Review" in l][0]
|
||||||
|
assert _POPYTKA in active
|
||||||
|
assert "2" in active
|
||||||
|
assert "\u0438\u0434\u0451\u0442" in active
|
||||||
|
|
||||||
|
|
||||||
|
def test_render_active_stage_no_attempt_on_first_run():
|
||||||
|
# Single reviewer run -> active line has NO attempt marker.
|
||||||
|
tid = _mk_task(stage="review")
|
||||||
|
_mk_run(tid, "analyst", "2026-06-04 09:00:00", "2026-06-04 09:10:00",
|
||||||
|
in_tok=10, out_tok=5, cost=0.1, model="tokenator/claude-opus-4-8")
|
||||||
|
_mk_run(tid, "developer", "2026-06-04 09:10:00", "2026-06-04 09:20:00",
|
||||||
|
in_tok=10, out_tok=5, cost=0.1, model="tokenator/claude-opus-4-8")
|
||||||
|
_mk_run(tid, "reviewer", "2026-06-04 09:20:00", None,
|
||||||
|
in_tok=0, out_tok=0, exit_code=None)
|
||||||
|
|
||||||
|
text = N.render_task_tracker(tid)
|
||||||
|
active = [l for l in text.splitlines()
|
||||||
|
if l.startswith("\U0001f504") and "Review" in l][0]
|
||||||
|
assert _POPYTKA not in active
|
||||||
|
assert "\u0438\u0434\u0451\u0442" in active
|
||||||
|
|
||||||
|
|
||||||
|
def test_render_finished_lines_unaffected_by_attempt_logic():
|
||||||
|
# Completed (checkmark) lines never carry an attempt marker.
|
||||||
|
tid = _mk_task(stage="review")
|
||||||
|
_mk_run(tid, "analyst", "2026-06-04 09:00:00", "2026-06-04 09:10:00",
|
||||||
|
in_tok=10, out_tok=5, cost=0.1, model="tokenator/claude-opus-4-8")
|
||||||
|
# developer ran twice (retry) but is a FINISHED stage now.
|
||||||
|
_mk_run(tid, "developer", "2026-06-04 09:10:00", "2026-06-04 09:15:00",
|
||||||
|
in_tok=10, out_tok=5, cost=0.1, model="tokenator/claude-opus-4-8")
|
||||||
|
_mk_run(tid, "developer", "2026-06-04 09:16:00", "2026-06-04 09:20:00",
|
||||||
|
in_tok=10, out_tok=5, cost=0.1, model="tokenator/claude-opus-4-8")
|
||||||
|
text = N.render_task_tracker(tid)
|
||||||
|
for l in text.splitlines():
|
||||||
|
if l.startswith("\u2705"):
|
||||||
|
assert _POPYTKA not in l
|
||||||
|
|
||||||
|
|
||||||
|
# --------------------------------------------------------------------------- #
|
||||||
|
# which alerts are SEPARATE vs tracker-only
|
||||||
|
# --------------------------------------------------------------------------- #
|
||||||
|
def test_approve_gate_sends_separate_message_and_starts_brd_clock(monkeypatch):
|
||||||
|
tid = _mk_task(stage="analysis")
|
||||||
|
calls = []
|
||||||
|
monkeypatch.setattr(N, "send_telegram",
|
||||||
|
lambda text, disable_notification=False: calls.append((text, disable_notification)) or 1)
|
||||||
|
monkeypatch.setattr(N, "update_task_tracker", lambda task_id: None)
|
||||||
|
|
||||||
|
N.notify_approve_requested(tid)
|
||||||
|
|
||||||
|
# exactly one SEPARATE (notifying) send for the approve gate
|
||||||
|
assert len(calls) == 1
|
||||||
|
assert calls[0][1] is False # notifying
|
||||||
|
assert "Approved" in calls[0][0]
|
||||||
|
# BRD clock started
|
||||||
|
conn = get_db()
|
||||||
|
row = conn.execute("SELECT brd_review_started_at FROM tasks WHERE id=?", (tid,)).fetchone()
|
||||||
|
conn.close()
|
||||||
|
assert row[0] is not None
|
||||||
|
|
||||||
|
|
||||||
|
def test_error_sends_separate_message(monkeypatch):
|
||||||
|
tid = _mk_task(stage="development")
|
||||||
|
calls = []
|
||||||
|
monkeypatch.setattr(N, "send_telegram",
|
||||||
|
lambda text, disable_notification=False: calls.append((text, disable_notification)) or 1)
|
||||||
|
N.notify_error(tid, "boom")
|
||||||
|
assert len(calls) == 1
|
||||||
|
assert calls[0][1] is False # notifying
|
||||||
|
assert "ERROR" in calls[0][0]
|
||||||
|
|
||||||
|
|
||||||
|
def test_stage_change_does_not_send_separate_message(monkeypatch):
|
||||||
|
tid = _mk_task(stage="development")
|
||||||
|
sent = []
|
||||||
|
monkeypatch.setattr(N, "send_telegram",
|
||||||
|
lambda text, disable_notification=False: sent.append(text) or 1)
|
||||||
|
# tracker refresh is allowed (edit/send silent) but must NOT use send_telegram
|
||||||
|
# for a separate notification; stub update to isolate.
|
||||||
|
refreshed = []
|
||||||
|
monkeypatch.setattr(N, "update_task_tracker", lambda task_id: refreshed.append(task_id))
|
||||||
|
|
||||||
|
N.notify_stage_change(tid, "development", "review")
|
||||||
|
assert sent == [] # no separate message
|
||||||
|
assert refreshed == [tid] # tracker refreshed instead
|
||||||
|
|
||||||
|
|
||||||
|
def test_agent_started_does_not_send_separate_message(monkeypatch):
|
||||||
|
tid = _mk_task(stage="analysis")
|
||||||
|
sent = []
|
||||||
|
monkeypatch.setattr(N, "send_telegram",
|
||||||
|
lambda text, disable_notification=False: sent.append(text) or 1)
|
||||||
|
refreshed = []
|
||||||
|
monkeypatch.setattr(N, "update_task_tracker", lambda task_id: refreshed.append(task_id))
|
||||||
|
|
||||||
|
N.notify_agent_started(1, "analyst", tid)
|
||||||
|
assert sent == []
|
||||||
|
assert refreshed == [tid]
|
||||||
|
|
||||||
|
|
||||||
|
def test_qg_failure_does_not_send_separate_message(monkeypatch):
|
||||||
|
tid = _mk_task(stage="development")
|
||||||
|
sent = []
|
||||||
|
monkeypatch.setattr(N, "send_telegram",
|
||||||
|
lambda text, disable_notification=False: sent.append(text) or 1)
|
||||||
|
N.notify_qg_failure(tid, "development", "check_ci_green", "CI state: pending")
|
||||||
|
assert sent == [] # QG-pending is log-only, never a separate ping
|
||||||
@@ -62,9 +62,27 @@ def test_parse_real_result_json():
|
|||||||
assert u["input_tokens"] == 45231
|
assert u["input_tokens"] == 45231
|
||||||
assert u["output_tokens"] == 12100
|
assert u["output_tokens"] == 12100
|
||||||
assert u["cache_read_tokens"] == 18500
|
assert u["cache_read_tokens"] == 18500
|
||||||
|
# FIX 2: cache_creation slice must now be parsed (was dropped before).
|
||||||
|
assert u["cache_creation_tokens"] == 7418
|
||||||
assert abs(u["cost_usd"] - 0.0560175) < 1e-9
|
assert abs(u["cost_usd"] - 0.0560175) < 1e-9
|
||||||
|
|
||||||
|
|
||||||
|
def test_parse_cache_creation_present():
|
||||||
|
u = U.parse_usage_from_text(REAL_RESULT_JSON)
|
||||||
|
assert u["cache_creation_tokens"] == 7418
|
||||||
|
|
||||||
|
|
||||||
|
def test_parse_cache_creation_missing_defaults_zero():
|
||||||
|
blob = (
|
||||||
|
'{"total_cost_usd":0.01,'
|
||||||
|
'"usage":{"input_tokens":10,"output_tokens":5,'
|
||||||
|
'"cache_read_input_tokens":100}}'
|
||||||
|
)
|
||||||
|
u = U.parse_usage_from_text(blob)
|
||||||
|
assert u["cache_creation_tokens"] == 0
|
||||||
|
assert u["cache_read_tokens"] == 100
|
||||||
|
|
||||||
|
|
||||||
def test_parse_with_leading_text():
|
def test_parse_with_leading_text():
|
||||||
"""The agent may print text before the trailing JSON; we still find it."""
|
"""The agent may print text before the trailing JSON; we still find it."""
|
||||||
text = "some agent stdout line\nanother line\n" + REAL_RESULT_JSON
|
text = "some agent stdout line\nanother line\n" + REAL_RESULT_JSON
|
||||||
@@ -106,13 +124,16 @@ def test_record_usage_writes_columns():
|
|||||||
U.record_usage(rid, u)
|
U.record_usage(rid, u)
|
||||||
conn = get_db()
|
conn = get_db()
|
||||||
row = conn.execute(
|
row = conn.execute(
|
||||||
"SELECT input_tokens, output_tokens, cache_read_tokens, cost_usd "
|
"SELECT input_tokens, output_tokens, cache_read_tokens, "
|
||||||
|
"cache_creation_tokens, cost_usd "
|
||||||
"FROM agent_runs WHERE id=?", (rid,)
|
"FROM agent_runs WHERE id=?", (rid,)
|
||||||
).fetchone()
|
).fetchone()
|
||||||
conn.close()
|
conn.close()
|
||||||
assert row["input_tokens"] == 45231
|
assert row["input_tokens"] == 45231
|
||||||
assert row["output_tokens"] == 12100
|
assert row["output_tokens"] == 12100
|
||||||
assert row["cache_read_tokens"] == 18500
|
assert row["cache_read_tokens"] == 18500
|
||||||
|
# FIX 2: cache_creation column is now persisted.
|
||||||
|
assert row["cache_creation_tokens"] == 7418
|
||||||
assert abs(row["cost_usd"] - 0.0560175) < 1e-9
|
assert abs(row["cost_usd"] - 0.0560175) < 1e-9
|
||||||
|
|
||||||
|
|
||||||
@@ -144,14 +165,82 @@ def test_fmt_cost():
|
|||||||
|
|
||||||
|
|
||||||
def test_usage_comment_format():
|
def test_usage_comment_format():
|
||||||
|
# No cache -> in_total == input_tokens, no cached breakdown shown.
|
||||||
u = {"input_tokens": 45231, "output_tokens": 12100, "cost_usd": 0.21}
|
u = {"input_tokens": 45231, "output_tokens": 12100, "cost_usd": 0.21}
|
||||||
c = U.usage_comment("developer", u)
|
c = U.usage_comment("developer", u)
|
||||||
assert "Developer" in c
|
assert "Developer" in c
|
||||||
assert "45.2k in" in c
|
assert "45.2k in" in c
|
||||||
|
assert "cached" not in c
|
||||||
assert "12.1k out" in c
|
assert "12.1k out" in c
|
||||||
assert "$0.21" in c
|
assert "$0.21" in c
|
||||||
|
|
||||||
|
|
||||||
|
def test_usage_comment_shows_full_input_with_cached():
|
||||||
|
"""FIX 2: in = input + cache_read + cache_creation, with cached breakdown."""
|
||||||
|
u = {
|
||||||
|
"input_tokens": 81,
|
||||||
|
"cache_read_tokens": 8_400_000,
|
||||||
|
"cache_creation_tokens": 100_000,
|
||||||
|
"output_tokens": 45_800,
|
||||||
|
"cost_usd": 7.29,
|
||||||
|
}
|
||||||
|
c = U.usage_comment("developer", u)
|
||||||
|
# total in = 8_500_081 -> 8.5M ; cached = 8_500_000 -> 8.5M
|
||||||
|
assert "8.5M in (8.5M cached)" in c
|
||||||
|
assert "45.8k out" in c
|
||||||
|
assert "$7.29" in c
|
||||||
|
|
||||||
|
|
||||||
|
def test_usage_comment_no_cached_when_zero():
|
||||||
|
u = {"input_tokens": 1234, "cache_read_tokens": 0,
|
||||||
|
"cache_creation_tokens": 0, "output_tokens": 50, "cost_usd": 0.01}
|
||||||
|
c = U.usage_comment("developer", u)
|
||||||
|
assert "1.2k in" in c
|
||||||
|
assert "cached" not in c
|
||||||
|
|
||||||
|
|
||||||
|
# --------------------------------------------------------------------------- #
|
||||||
|
# FIX 4: per-agent artifact links in finish comments
|
||||||
|
# --------------------------------------------------------------------------- #
|
||||||
|
def _ctx():
|
||||||
|
return dict(repo="enduro-trails", branch="feature/ET-012-x",
|
||||||
|
work_item_id="ET-012")
|
||||||
|
|
||||||
|
|
||||||
|
def test_usage_comment_reviewer_links_review_doc():
|
||||||
|
c = U.usage_comment("reviewer", {"input_tokens": 5}, **_ctx())
|
||||||
|
assert "12-review.md" in c
|
||||||
|
assert "ET-012" in c
|
||||||
|
|
||||||
|
|
||||||
|
def test_usage_comment_tester_links_test_report():
|
||||||
|
c = U.usage_comment("tester", {"input_tokens": 5}, **_ctx())
|
||||||
|
assert "13-test-report.md" in c
|
||||||
|
|
||||||
|
|
||||||
|
def test_usage_comment_deployer_links_deploy_log():
|
||||||
|
c = U.usage_comment("deployer", {"input_tokens": 5}, **_ctx())
|
||||||
|
assert "14-deploy-log.md" in c
|
||||||
|
|
||||||
|
|
||||||
|
def test_usage_comment_developer_links_pr_and_branch():
|
||||||
|
c = U.usage_comment("developer", {"input_tokens": 5}, pr_number=7, **_ctx())
|
||||||
|
assert "pulls/7" in c
|
||||||
|
assert "feature/ET-012-x" in c
|
||||||
|
|
||||||
|
|
||||||
|
def test_usage_comment_architect_links_adr():
|
||||||
|
c = U.usage_comment("architect", {"input_tokens": 5}, **_ctx())
|
||||||
|
assert "06-adr" in c
|
||||||
|
|
||||||
|
|
||||||
|
def test_usage_comment_no_links_without_context():
|
||||||
|
"""Without repo/branch context, no links are appended (no crash)."""
|
||||||
|
c = U.usage_comment("reviewer", {"input_tokens": 5})
|
||||||
|
assert "12-review.md" not in c
|
||||||
|
assert "http" not in c
|
||||||
|
|
||||||
|
|
||||||
# --------------------------------------------------------------------------- #
|
# --------------------------------------------------------------------------- #
|
||||||
# task summary
|
# task summary
|
||||||
# --------------------------------------------------------------------------- #
|
# --------------------------------------------------------------------------- #
|
||||||
@@ -174,3 +263,47 @@ def test_task_summary_aggregates_over_agents():
|
|||||||
assert "$0.15" in comment # total cost
|
assert "$0.15" in comment # total cost
|
||||||
assert "Developer" in comment
|
assert "Developer" in comment
|
||||||
assert "Tester" in comment
|
assert "Tester" in comment
|
||||||
|
|
||||||
|
|
||||||
|
def test_task_summary_sums_all_three_input_components():
|
||||||
|
"""FIX 2: total_in = SUM(input + cache_read + cache_creation); total_cached too."""
|
||||||
|
rid = _new_run(agent="developer", task_id=77)
|
||||||
|
U.record_usage(rid, {
|
||||||
|
"input_tokens": 100,
|
||||||
|
"cache_read_tokens": 2000,
|
||||||
|
"cache_creation_tokens": 900,
|
||||||
|
"output_tokens": 50,
|
||||||
|
"cost_usd": 0.10,
|
||||||
|
})
|
||||||
|
rid2 = _new_run(agent="tester", task_id=77)
|
||||||
|
U.record_usage(rid2, {
|
||||||
|
"input_tokens": 10,
|
||||||
|
"cache_read_tokens": 500,
|
||||||
|
"cache_creation_tokens": 0,
|
||||||
|
"output_tokens": 5,
|
||||||
|
"cost_usd": 0.05,
|
||||||
|
})
|
||||||
|
s = U.task_usage_summary(77)
|
||||||
|
# total_in = (100+2000+900) + (10+500+0) = 3510
|
||||||
|
assert s["total_in"] == 3510
|
||||||
|
# total_cached = (2000+900) + (500+0) = 3400
|
||||||
|
assert s["total_cached"] == 3400
|
||||||
|
assert s["total_out"] == 55
|
||||||
|
comment = U.task_summary_comment(77)
|
||||||
|
assert "cached" in comment
|
||||||
|
|
||||||
|
|
||||||
|
def test_task_summary_handles_null_cache_creation():
|
||||||
|
"""Pre-existing rows (NULL cache_creation) must not break aggregation."""
|
||||||
|
rid = _new_run(agent="developer", task_id=88)
|
||||||
|
conn = get_db()
|
||||||
|
conn.execute(
|
||||||
|
"UPDATE agent_runs SET input_tokens=100, cache_read_tokens=200, "
|
||||||
|
"cache_creation_tokens=NULL, output_tokens=10, cost_usd=0.01 WHERE id=?",
|
||||||
|
(rid,),
|
||||||
|
)
|
||||||
|
conn.commit()
|
||||||
|
conn.close()
|
||||||
|
s = U.task_usage_summary(88) # must not raise
|
||||||
|
assert s["total_in"] == 300 # 100 + 200 + (NULL->0)
|
||||||
|
assert s["total_cached"] == 200
|
||||||
|
|||||||
@@ -1,12 +1,14 @@
|
|||||||
"""Feature 2 (variant B): verdict statuses Approved / Rejected.
|
"""Status-only verdict model: verdict statuses Approved / Rejected.
|
||||||
|
|
||||||
* issue updated -> Approved : calls _try_advance_stage (== :approved: comment).
|
* issue updated -> Approved : calls _try_advance_stage, with NO intermediate
|
||||||
* issue updated -> Rejected : calls _rollback_stage (== :rejected: comment).
|
set_issue_in_progress reset (bug 3 fix).
|
||||||
* the :approved: / :rejected: COMMENT mechanisms still work (both paths live).
|
* issue updated -> Rejected : calls _rollback_stage, with the reason pulled
|
||||||
|
from the issue's latest comment.
|
||||||
|
* COMMENTS NEVER trigger the pipeline: a :approved: / :rejected: comment is a
|
||||||
|
pure no-op (the comment-based control mechanism was removed).
|
||||||
|
|
||||||
We mock the shared engine entry points (_try_advance_stage / _rollback_stage)
|
We mock the shared engine entry points (_try_advance_stage / _rollback_stage)
|
||||||
and assert they fire for both the status and the comment trigger, so the two
|
and assert they fire ONLY for the status trigger, never for a comment.
|
||||||
mechanisms are proven to funnel into the same logic.
|
|
||||||
"""
|
"""
|
||||||
|
|
||||||
import os
|
import os
|
||||||
@@ -83,8 +85,21 @@ def _comment(text, plane_id="v-1"):
|
|||||||
})
|
})
|
||||||
|
|
||||||
|
|
||||||
|
class _FakeResp:
|
||||||
|
def __init__(self, status_code, payload):
|
||||||
|
self.status_code = status_code
|
||||||
|
self._payload = payload
|
||||||
|
|
||||||
|
def json(self):
|
||||||
|
return self._payload
|
||||||
|
|
||||||
|
|
||||||
|
def _comments_response(comments):
|
||||||
|
return _FakeResp(200, {"results": comments})
|
||||||
|
|
||||||
|
|
||||||
# --------------------------------------------------------------------------- #
|
# --------------------------------------------------------------------------- #
|
||||||
# Approved status -> advance
|
# Approved status -> advance (no in_progress reset)
|
||||||
# --------------------------------------------------------------------------- #
|
# --------------------------------------------------------------------------- #
|
||||||
@patch("src.plane_sync.set_issue_in_progress")
|
@patch("src.plane_sync.set_issue_in_progress")
|
||||||
@patch("src.webhooks.plane._try_advance_stage", new_callable=AsyncMock)
|
@patch("src.webhooks.plane._try_advance_stage", new_callable=AsyncMock)
|
||||||
@@ -95,36 +110,52 @@ def test_approved_status_advances(mock_advance, mock_sip):
|
|||||||
# advanced the right task (ET-500 at review)
|
# advanced the right task (ET-500 at review)
|
||||||
args = mock_advance.call_args.args
|
args = mock_advance.call_args.args
|
||||||
assert "ET-500" in args # work_item_id is passed positionally
|
assert "ET-500" in args # work_item_id is passed positionally
|
||||||
|
# bug 3 fix: handle_verdict no longer resets the status to In Progress.
|
||||||
|
mock_sip.assert_not_called()
|
||||||
|
|
||||||
|
|
||||||
@patch("src.plane_sync.set_issue_in_progress")
|
@patch("src.plane_sync.set_issue_in_progress")
|
||||||
|
@patch("src.webhooks.plane._rollback_stage", new_callable=AsyncMock)
|
||||||
@patch("src.webhooks.plane._try_advance_stage", new_callable=AsyncMock)
|
@patch("src.webhooks.plane._try_advance_stage", new_callable=AsyncMock)
|
||||||
def test_approved_comment_still_advances(mock_advance, mock_sip):
|
def test_approved_comment_is_noop(mock_advance, mock_rollback, mock_sip):
|
||||||
|
"""Status-only model: a :approved: comment NEVER advances the pipeline."""
|
||||||
resp = _comment(":approved:")
|
resp = _comment(":approved:")
|
||||||
assert resp.status_code == 200
|
assert resp.status_code == 200
|
||||||
mock_advance.assert_awaited_once()
|
mock_advance.assert_not_called()
|
||||||
|
mock_rollback.assert_not_called()
|
||||||
|
mock_sip.assert_not_called()
|
||||||
|
|
||||||
|
|
||||||
# --------------------------------------------------------------------------- #
|
# --------------------------------------------------------------------------- #
|
||||||
# Rejected status -> rollback
|
# Rejected status -> rollback (reason from latest comment)
|
||||||
# --------------------------------------------------------------------------- #
|
# --------------------------------------------------------------------------- #
|
||||||
|
@patch("src.webhooks.plane.httpx.get")
|
||||||
@patch("src.webhooks.plane._rollback_stage", new_callable=AsyncMock)
|
@patch("src.webhooks.plane._rollback_stage", new_callable=AsyncMock)
|
||||||
def test_rejected_status_rolls_back(mock_rollback):
|
def test_rejected_status_rolls_back(mock_rollback, mock_get):
|
||||||
|
mock_get.return_value = _comments_response(
|
||||||
|
[{"comment_stripped": "ADR missing tradeoffs",
|
||||||
|
"created_at": "2026-06-03T10:00:00Z"}]
|
||||||
|
)
|
||||||
resp = _status(REJECTED)
|
resp = _status(REJECTED)
|
||||||
assert resp.status_code == 200
|
assert resp.status_code == 200
|
||||||
mock_rollback.assert_awaited_once()
|
mock_rollback.assert_awaited_once()
|
||||||
# reason note for a status reject (no inline reason available)
|
# reason pulled from the latest comment
|
||||||
kwargs_reason = mock_rollback.call_args.args[-1]
|
reason = mock_rollback.call_args.args[-1]
|
||||||
assert "rejected via status" in kwargs_reason
|
assert "ADR missing tradeoffs" in reason
|
||||||
|
|
||||||
|
|
||||||
|
@patch("src.webhooks.plane.httpx.get")
|
||||||
|
@patch("src.plane_sync.set_issue_in_progress")
|
||||||
@patch("src.webhooks.plane._rollback_stage", new_callable=AsyncMock)
|
@patch("src.webhooks.plane._rollback_stage", new_callable=AsyncMock)
|
||||||
def test_rejected_comment_still_rolls_back(mock_rollback):
|
@patch("src.webhooks.plane._try_advance_stage", new_callable=AsyncMock)
|
||||||
|
def test_rejected_comment_is_noop(mock_advance, mock_rollback, mock_sip, mock_get):
|
||||||
|
"""Status-only model: a :rejected: comment NEVER rolls back the pipeline."""
|
||||||
resp = _comment(":rejected: bad ADR")
|
resp = _comment(":rejected: bad ADR")
|
||||||
assert resp.status_code == 200
|
assert resp.status_code == 200
|
||||||
mock_rollback.assert_awaited_once()
|
mock_advance.assert_not_called()
|
||||||
reason = mock_rollback.call_args.args[-1]
|
mock_rollback.assert_not_called()
|
||||||
assert "bad ADR" in reason
|
mock_sip.assert_not_called()
|
||||||
|
mock_get.assert_not_called()
|
||||||
|
|
||||||
|
|
||||||
# --------------------------------------------------------------------------- #
|
# --------------------------------------------------------------------------- #
|
||||||
|
|||||||
@@ -1,4 +1,5 @@
|
|||||||
import pytest
|
import pytest
|
||||||
|
import asyncio
|
||||||
import os
|
import os
|
||||||
import tempfile
|
import tempfile
|
||||||
from unittest.mock import patch, MagicMock, AsyncMock
|
from unittest.mock import patch, MagicMock, AsyncMock
|
||||||
@@ -95,27 +96,32 @@ def test_plane_webhook_generates_sequential_ids(mock_docs, mock_branch):
|
|||||||
assert ids[1] == "ET-002"
|
assert ids[1] == "ET-002"
|
||||||
|
|
||||||
|
|
||||||
|
APPROVED_STATE = "a519a341-dada-4a91-8910-7604f82b79c5"
|
||||||
|
REJECTED_STATE = "ba958f3c-5db5-461d-8f82-89425e413b97"
|
||||||
|
|
||||||
|
|
||||||
@patch("src.webhooks.plane._create_gitea_branch", new_callable=AsyncMock)
|
@patch("src.webhooks.plane._create_gitea_branch", new_callable=AsyncMock)
|
||||||
@patch("src.webhooks.plane._create_initial_docs", new_callable=AsyncMock)
|
@patch("src.webhooks.plane._create_initial_docs", new_callable=AsyncMock)
|
||||||
@patch("src.webhooks.plane.launcher")
|
@patch("src.webhooks.plane.launcher")
|
||||||
def test_plane_approved_advances_stage(mock_launcher, mock_docs, mock_branch, tmp_path, monkeypatch):
|
def test_plane_approved_advances_stage(mock_launcher, mock_docs, mock_branch, tmp_path, monkeypatch):
|
||||||
"""Comment :approved: at stage=analysis → advance to architecture."""
|
"""Status-only model: Approved STATUS at stage=analysis -> advance to
|
||||||
|
architecture. A comment never triggers this.
|
||||||
|
"""
|
||||||
# Patch repos_dir for QG check
|
# Patch repos_dir for QG check
|
||||||
monkeypatch.setattr("src.qg.checks.settings.repos_dir", str(tmp_path))
|
monkeypatch.setattr("src.qg.checks.settings.repos_dir", str(tmp_path))
|
||||||
|
|
||||||
# Create task first
|
# Seed an analysis task directly (creation no longer makes a task post-PR#11).
|
||||||
client.post("/webhook/plane", json={
|
|
||||||
"event": "work_item.created",
|
|
||||||
"data": {"id": "adv-001", "name": "Advance test", "project": "proj-1"}
|
|
||||||
})
|
|
||||||
|
|
||||||
# Get the task to find work_item_id
|
|
||||||
conn = get_db()
|
conn = get_db()
|
||||||
task = conn.execute("SELECT * FROM tasks WHERE plane_id = 'adv-001'").fetchone()
|
conn.execute(
|
||||||
|
"INSERT INTO tasks (plane_id, work_item_id, repo, branch, stage, plane_issue_id) "
|
||||||
|
"VALUES (?, ?, ?, ?, ?, ?)",
|
||||||
|
("adv-001", "ET-001", "enduro-trails", "feature/ET-001-x", "analysis", "adv-001"),
|
||||||
|
)
|
||||||
|
conn.commit()
|
||||||
conn.close()
|
conn.close()
|
||||||
work_item_id = task["work_item_id"]
|
work_item_id = "ET-001"
|
||||||
|
|
||||||
# Create required analysis files
|
# Create required analysis files so the analysis QG passes.
|
||||||
wi_dir = tmp_path / "enduro-trails" / "docs" / "work-items" / work_item_id
|
wi_dir = tmp_path / "enduro-trails" / "docs" / "work-items" / work_item_id
|
||||||
wi_dir.mkdir(parents=True)
|
wi_dir.mkdir(parents=True)
|
||||||
(wi_dir / "01-brd.md").write_text("# BRD")
|
(wi_dir / "01-brd.md").write_text("# BRD")
|
||||||
@@ -123,16 +129,15 @@ def test_plane_approved_advances_stage(mock_launcher, mock_docs, mock_branch, tm
|
|||||||
(wi_dir / "03-acceptance-criteria.md").write_text("# AC")
|
(wi_dir / "03-acceptance-criteria.md").write_text("# AC")
|
||||||
(wi_dir / "04-test-plan.yaml").write_text("tests: []")
|
(wi_dir / "04-test-plan.yaml").write_text("tests: []")
|
||||||
|
|
||||||
# Mock launcher
|
|
||||||
mock_launcher.launch.return_value = 1
|
mock_launcher.launch.return_value = 1
|
||||||
|
|
||||||
# Send approved comment
|
# Send Approved STATUS change.
|
||||||
resp = client.post("/webhook/plane", json={
|
resp = client.post("/webhook/plane", json={
|
||||||
"event": "comment.created",
|
"event": "issue", "action": "updated",
|
||||||
"data": {
|
"data": {
|
||||||
"work_item_id": "adv-001",
|
"id": "adv-001", "name": "Advance test", "project": "proj-1",
|
||||||
"comment": "Looks good :approved:"
|
"state": {"id": APPROVED_STATE, "name": "Approved", "group": "completed"},
|
||||||
}
|
},
|
||||||
})
|
})
|
||||||
assert resp.status_code == 200
|
assert resp.status_code == 200
|
||||||
|
|
||||||
@@ -143,29 +148,39 @@ def test_plane_approved_advances_stage(mock_launcher, mock_docs, mock_branch, tm
|
|||||||
assert task["stage"] == "architecture"
|
assert task["stage"] == "architecture"
|
||||||
|
|
||||||
|
|
||||||
|
@patch("src.webhooks.plane.httpx.get")
|
||||||
@patch("src.webhooks.plane._create_gitea_branch", new_callable=AsyncMock)
|
@patch("src.webhooks.plane._create_gitea_branch", new_callable=AsyncMock)
|
||||||
@patch("src.webhooks.plane._create_initial_docs", new_callable=AsyncMock)
|
@patch("src.webhooks.plane._create_initial_docs", new_callable=AsyncMock)
|
||||||
def test_plane_rejected_rolls_back(mock_docs, mock_branch):
|
def test_plane_rejected_rolls_back(mock_docs, mock_branch, mock_get):
|
||||||
"""Comment :rejected: rolls back stage."""
|
"""Status-only model: Rejected STATUS rolls back stage. A comment never
|
||||||
# Create task
|
triggers this; the reason is pulled from the latest comment.
|
||||||
client.post("/webhook/plane", json={
|
"""
|
||||||
"event": "work_item.created",
|
class _R:
|
||||||
"data": {"id": "rej-001", "name": "Reject test", "project": "proj-1"}
|
status_code = 200
|
||||||
})
|
@staticmethod
|
||||||
|
def json():
|
||||||
|
return {"results": [
|
||||||
|
{"comment_stripped": "missing ADR", "created_at": "2026-06-03T10:00:00Z"}
|
||||||
|
]}
|
||||||
|
mock_get.return_value = _R()
|
||||||
|
|
||||||
# Manually set stage to architecture
|
# Seed an architecture task directly.
|
||||||
conn = get_db()
|
conn = get_db()
|
||||||
conn.execute("UPDATE tasks SET stage = 'architecture' WHERE plane_id = 'rej-001'")
|
conn.execute(
|
||||||
|
"INSERT INTO tasks (plane_id, work_item_id, repo, branch, stage, plane_issue_id) "
|
||||||
|
"VALUES (?, ?, ?, ?, ?, ?)",
|
||||||
|
("rej-001", "ET-002", "enduro-trails", "feature/ET-002-x", "architecture", "rej-001"),
|
||||||
|
)
|
||||||
conn.commit()
|
conn.commit()
|
||||||
conn.close()
|
conn.close()
|
||||||
|
|
||||||
# Send rejected comment
|
# Send Rejected STATUS change.
|
||||||
resp = client.post("/webhook/plane", json={
|
resp = client.post("/webhook/plane", json={
|
||||||
"event": "comment.created",
|
"event": "issue", "action": "updated",
|
||||||
"data": {
|
"data": {
|
||||||
"work_item_id": "rej-001",
|
"id": "rej-001", "name": "Reject test", "project": "proj-1",
|
||||||
"comment": "Not ready :rejected:"
|
"state": {"id": REJECTED_STATE, "name": "Rejected", "group": "cancelled"},
|
||||||
}
|
},
|
||||||
})
|
})
|
||||||
assert resp.status_code == 200
|
assert resp.status_code == 200
|
||||||
|
|
||||||
@@ -258,6 +273,46 @@ def test_gitea_ci_success_advances_to_review(mock_launcher, mock_ci):
|
|||||||
assert task["stage"] == "review"
|
assert task["stage"] == "review"
|
||||||
|
|
||||||
|
|
||||||
|
@patch("src.webhooks.gitea.notify_qg_failure")
|
||||||
|
@patch("src.webhooks.gitea.launcher")
|
||||||
|
def test_gitea_ci_failure_on_development_notifies_qg_failure(mock_launcher, mock_notify):
|
||||||
|
"""BUG 6: CI failure at development is now the authoritative QG gate failing.
|
||||||
|
|
||||||
|
It must notify QG failure (not silently suppress) and must NOT advance the stage.
|
||||||
|
"""
|
||||||
|
conn = get_db()
|
||||||
|
conn.execute(
|
||||||
|
"INSERT INTO tasks (plane_id, work_item_id, repo, branch, stage) VALUES (?, ?, ?, ?, ?)",
|
||||||
|
("ci-fail-001", "ET-011", "enduro-trails", "feature/ET-011-test", "development"),
|
||||||
|
)
|
||||||
|
conn.commit()
|
||||||
|
conn.close()
|
||||||
|
|
||||||
|
resp = client.post(
|
||||||
|
"/webhook/gitea",
|
||||||
|
json={
|
||||||
|
"state": "failure",
|
||||||
|
"branches": [{"name": "feature/ET-011-test"}],
|
||||||
|
"repository": {"name": "enduro-trails"},
|
||||||
|
},
|
||||||
|
headers={"X-Gitea-Event": "status"},
|
||||||
|
)
|
||||||
|
assert resp.status_code == 200
|
||||||
|
|
||||||
|
# QG failure was reported for the development stage with check_ci_green.
|
||||||
|
assert mock_notify.called
|
||||||
|
args, kwargs = mock_notify.call_args
|
||||||
|
call = list(args) + list(kwargs.values())
|
||||||
|
assert "development" in call
|
||||||
|
assert "check_ci_green" in call
|
||||||
|
|
||||||
|
# Stage did NOT advance.
|
||||||
|
conn = get_db()
|
||||||
|
task = conn.execute("SELECT * FROM tasks WHERE plane_id = 'ci-fail-001'").fetchone()
|
||||||
|
conn.close()
|
||||||
|
assert task["stage"] == "development"
|
||||||
|
|
||||||
|
|
||||||
def test_gitea_webhook_pr():
|
def test_gitea_webhook_pr():
|
||||||
"""PR event is accepted."""
|
"""PR event is accepted."""
|
||||||
resp = client.post(
|
resp = client.post(
|
||||||
@@ -287,3 +342,158 @@ def test_plane_webhook_event_logged():
|
|||||||
conn.close()
|
conn.close()
|
||||||
assert event is not None
|
assert event is not None
|
||||||
assert event["source"] == "plane"
|
assert event["source"] == "plane"
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# BUG 7: red CI on development must bounce the task back to the developer
|
||||||
|
# (capped retries, symmetric to review REQUEST_CHANGES). These are pure-logic
|
||||||
|
# tests: they invoke handle_ci_status() directly with mocked helpers so they do
|
||||||
|
# not pass through the TestClient HMAC barrier (baseline 401s are off-limits).
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
def _ci_failure_payload():
|
||||||
|
return {
|
||||||
|
"state": "failure",
|
||||||
|
"branches": [{"name": "feature/ET-011-test"}],
|
||||||
|
"repository": {"name": "enduro-trails"},
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def _mock_db_with_retry_count(count):
|
||||||
|
"""Build a get_db() mock whose retry_count query returns `count`."""
|
||||||
|
conn = MagicMock()
|
||||||
|
conn.execute.return_value.fetchone.return_value = {"cnt": count}
|
||||||
|
return conn
|
||||||
|
|
||||||
|
|
||||||
|
@patch("src.webhooks.gitea.notify_error")
|
||||||
|
@patch("src.webhooks.gitea.notify_qg_failure")
|
||||||
|
@patch("src.webhooks.gitea.enqueue_job")
|
||||||
|
@patch("src.webhooks.gitea.update_task_stage")
|
||||||
|
@patch("src.webhooks.gitea.get_db")
|
||||||
|
@patch("src.webhooks.gitea.get_task_by_repo_branch")
|
||||||
|
@patch("src.webhooks.gitea.get_project_by_repo")
|
||||||
|
def test_ci_failure_development_retries_developer_under_limit(
|
||||||
|
mock_proj, mock_task, mock_get_db, mock_update_stage,
|
||||||
|
mock_enqueue, mock_qg, mock_err,
|
||||||
|
):
|
||||||
|
"""retry_count < MAX_DEV_RETRIES → relaunch developer, stage untouched."""
|
||||||
|
from src.webhooks.gitea import handle_ci_status
|
||||||
|
|
||||||
|
mock_proj.return_value = {"repo": "enduro-trails"}
|
||||||
|
mock_task.return_value = {
|
||||||
|
"id": 1, "stage": "development", "work_item_id": "ET-011",
|
||||||
|
}
|
||||||
|
mock_get_db.return_value = _mock_db_with_retry_count(0)
|
||||||
|
mock_enqueue.return_value = 42
|
||||||
|
|
||||||
|
asyncio.run(handle_ci_status(_ci_failure_payload()))
|
||||||
|
|
||||||
|
# QG failure was still reported (Slava sees both the failure and the retry).
|
||||||
|
assert mock_qg.called
|
||||||
|
# developer was re-enqueued.
|
||||||
|
assert mock_enqueue.called
|
||||||
|
assert mock_enqueue.call_args[0][0] == "developer"
|
||||||
|
# No escalation.
|
||||||
|
assert not mock_err.called
|
||||||
|
# Stage stays on development — no update_task_stage in the CI-failure path.
|
||||||
|
assert not mock_update_stage.called
|
||||||
|
|
||||||
|
|
||||||
|
@patch("src.webhooks.gitea.notify_error")
|
||||||
|
@patch("src.webhooks.gitea.notify_qg_failure")
|
||||||
|
@patch("src.webhooks.gitea.enqueue_job")
|
||||||
|
@patch("src.webhooks.gitea.update_task_stage")
|
||||||
|
@patch("src.webhooks.gitea.get_db")
|
||||||
|
@patch("src.webhooks.gitea.get_task_by_repo_branch")
|
||||||
|
@patch("src.webhooks.gitea.get_project_by_repo")
|
||||||
|
def test_ci_failure_development_escalates_at_limit(
|
||||||
|
mock_proj, mock_task, mock_get_db, mock_update_stage,
|
||||||
|
mock_enqueue, mock_qg, mock_err,
|
||||||
|
):
|
||||||
|
"""retry_count >= MAX_DEV_RETRIES → escalate via notify_error, no relaunch."""
|
||||||
|
from src.webhooks.gitea import handle_ci_status, MAX_DEV_RETRIES
|
||||||
|
|
||||||
|
mock_proj.return_value = {"repo": "enduro-trails"}
|
||||||
|
mock_task.return_value = {
|
||||||
|
"id": 1, "stage": "development", "work_item_id": "ET-011",
|
||||||
|
}
|
||||||
|
mock_get_db.return_value = _mock_db_with_retry_count(MAX_DEV_RETRIES)
|
||||||
|
|
||||||
|
asyncio.run(handle_ci_status(_ci_failure_payload()))
|
||||||
|
|
||||||
|
# QG failure still reported.
|
||||||
|
assert mock_qg.called
|
||||||
|
# developer NOT re-enqueued at the cap.
|
||||||
|
assert not mock_enqueue.called
|
||||||
|
# Escalation message mentions CI failure.
|
||||||
|
assert mock_err.called
|
||||||
|
err_msg = " ".join(str(a) for a in mock_err.call_args[0])
|
||||||
|
assert "Max developer retries" in err_msg
|
||||||
|
assert "after CI failure" in err_msg
|
||||||
|
# Stage untouched.
|
||||||
|
assert not mock_update_stage.called
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# BUG 8 (second door): a merged-PR webhook must NOT fake-complete a task that is
|
||||||
|
# still in the deploy stage. On `deploy` done is gated by the deployer's verdict
|
||||||
|
# (check_deploy_status via advance_stage), not by the merge event. For every
|
||||||
|
# other stage the merge->done behaviour is preserved. Pure-logic tests: invoke
|
||||||
|
# handle_pr() directly with mocked helpers (no HMAC barrier).
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
def _merged_pr_payload(branch="feature/ET-012-x"):
|
||||||
|
return {
|
||||||
|
"action": "closed",
|
||||||
|
"pull_request": {
|
||||||
|
"merged": True,
|
||||||
|
"number": 7,
|
||||||
|
"head": {"ref": branch},
|
||||||
|
},
|
||||||
|
"repository": {"name": "enduro-trails"},
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@patch("src.webhooks.gitea.notify_stage_change")
|
||||||
|
@patch("src.webhooks.gitea.update_task_stage")
|
||||||
|
@patch("src.webhooks.gitea.get_task_by_repo_branch")
|
||||||
|
@patch("src.webhooks.gitea.get_project_by_repo")
|
||||||
|
def test_merge_on_deploy_stage_does_not_set_done(
|
||||||
|
mock_proj, mock_task, mock_update_stage, mock_notify,
|
||||||
|
):
|
||||||
|
"""FIX 1: merge at deploy stage is ignored — done is gated by deployer verdict."""
|
||||||
|
from src.webhooks.gitea import handle_pr
|
||||||
|
|
||||||
|
mock_proj.return_value = {"repo": "enduro-trails"}
|
||||||
|
mock_task.return_value = {
|
||||||
|
"id": 1, "stage": "deploy", "work_item_id": "ET-012",
|
||||||
|
}
|
||||||
|
|
||||||
|
asyncio.run(handle_pr(_merged_pr_payload()))
|
||||||
|
|
||||||
|
# The merge-driven done path must NOT run on deploy.
|
||||||
|
assert not mock_update_stage.called
|
||||||
|
assert not mock_notify.called
|
||||||
|
|
||||||
|
|
||||||
|
@patch("src.webhooks.gitea.notify_stage_change")
|
||||||
|
@patch("src.webhooks.gitea.update_task_stage")
|
||||||
|
@patch("src.webhooks.gitea.get_task_by_repo_branch")
|
||||||
|
@patch("src.webhooks.gitea.get_project_by_repo")
|
||||||
|
def test_merge_on_non_deploy_stage_sets_done(
|
||||||
|
mock_proj, mock_task, mock_update_stage, mock_notify,
|
||||||
|
):
|
||||||
|
"""FIX 1: merge behaviour is preserved for non-deploy stages (e.g. review)."""
|
||||||
|
from src.webhooks.gitea import handle_pr
|
||||||
|
|
||||||
|
mock_proj.return_value = {"repo": "enduro-trails"}
|
||||||
|
mock_task.return_value = {
|
||||||
|
"id": 2, "stage": "review", "work_item_id": "ET-013",
|
||||||
|
}
|
||||||
|
|
||||||
|
asyncio.run(handle_pr(_merged_pr_payload(branch="feature/ET-013-x")))
|
||||||
|
|
||||||
|
# Non-deploy stages still get the merge-driven done.
|
||||||
|
mock_update_stage.assert_called_once_with(2, "done")
|
||||||
|
assert mock_notify.called
|
||||||
|
|||||||
Reference in New Issue
Block a user