Compare commits
8 Commits
fix/ci-fai
...
fix/tracke
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
ec9aa74492 | ||
| 3e5c74ce4f | |||
|
|
9a0298de9d | ||
| 2801983d7b | |||
|
|
61e26a8930 | ||
| 2629dffe1b | |||
|
|
e4a9c48395 | ||
| a0621b9952 |
@@ -699,12 +699,49 @@ class AgentLauncher:
|
||||
task_id, work_item_id = row[0], row[1]
|
||||
if not work_item_id:
|
||||
return
|
||||
plane_add_comment(work_item_id, usage_comment(agent, usage), author=agent)
|
||||
# Observability: every agent's finish comment links its artifact(s)
|
||||
# (reviewer->12-review, tester->13-test-report, deployer->14-deploy-log,
|
||||
# architect->ADR, developer->PR/branch). For the developer we resolve the
|
||||
# open PR number so the link points straight at it.
|
||||
pr_number = None
|
||||
if agent == "developer":
|
||||
pr_number = self._open_pr_number(repo, branch)
|
||||
plane_add_comment(
|
||||
work_item_id,
|
||||
usage_comment(
|
||||
agent,
|
||||
usage,
|
||||
repo=repo,
|
||||
branch=branch,
|
||||
work_item_id=work_item_id,
|
||||
pr_number=pr_number,
|
||||
),
|
||||
author=agent,
|
||||
)
|
||||
if agent == "deployer":
|
||||
plane_add_comment(
|
||||
work_item_id, task_summary_comment(task_id), author="deployer"
|
||||
)
|
||||
|
||||
def _open_pr_number(self, repo: str, branch: str):
|
||||
"""Return the open PR number for `branch`, or None. Never raises."""
|
||||
try:
|
||||
import httpx
|
||||
owner = settings.gitea_owner
|
||||
headers = {"Authorization": f"token {settings.gitea_token}"}
|
||||
resp = httpx.get(
|
||||
f"{settings.gitea_url}/api/v1/repos/{owner}/{repo}/pulls",
|
||||
params={"state": "open", "head": branch},
|
||||
headers=headers, timeout=5,
|
||||
)
|
||||
if resp.status_code == 200:
|
||||
prs = resp.json()
|
||||
if prs:
|
||||
return prs[0].get("number")
|
||||
except Exception:
|
||||
pass
|
||||
return None
|
||||
|
||||
def _ensure_pr(self, repo: str, branch: str, run_id: int):
|
||||
import httpx
|
||||
owner = settings.gitea_owner
|
||||
|
||||
90
src/db.py
90
src/db.py
@@ -83,7 +83,32 @@ def init_db():
|
||||
_ensure_column(conn, "agent_runs", "input_tokens", "INTEGER")
|
||||
_ensure_column(conn, "agent_runs", "output_tokens", "INTEGER")
|
||||
_ensure_column(conn, "agent_runs", "cache_read_tokens", "INTEGER")
|
||||
# Observability fix: also persist cache-CREATION input tokens. Claude CLI
|
||||
# reports the real input split across input_tokens (fresh, ~tens) +
|
||||
# cache_read_input_tokens (cache hit, millions) + cache_creation_input_tokens
|
||||
# (writing new cache). Without this column the cache_creation slice is lost
|
||||
# and the "X in" figure understates the true prompt size. Idempotent ALTER.
|
||||
_ensure_column(conn, "agent_runs", "cache_creation_tokens", "INTEGER")
|
||||
_ensure_column(conn, "agent_runs", "cost_usd", "REAL")
|
||||
# Telegram live tracker (feat/telegram-live-tracker): persist the FULL model
|
||||
# name (e.g. "tokenator/claude-opus-4-8") per agent_runs row so the tracker
|
||||
# can render a short model tag per stage. Parsed from the run-log result JSON
|
||||
# (modelUsage key) by the launcher monitor; NULL when unknown. Idempotent ALTER.
|
||||
_ensure_column(conn, "agent_runs", "model", "TEXT")
|
||||
# Telegram live tracker: one editable Telegram message per task. We store its
|
||||
# message_id so each stage transition can editMessageText the same message
|
||||
# instead of spamming a new one. Idempotent ALTER (safe on the live prod DB).
|
||||
_ensure_column(conn, "tasks", "tracker_message_id", "INTEGER")
|
||||
# Telegram live tracker: human-readable task title for the tracker header
|
||||
# ("🛠️ ET-012 · <title>"). Populated from the Plane work-item name at task
|
||||
# creation; falls back to the work_item_id when absent. Idempotent ALTER.
|
||||
_ensure_column(conn, "tasks", "title", "TEXT")
|
||||
# Telegram live tracker: "BRD review" is the only HUMAN gate time — the delta
|
||||
# between "BRD ready / approve requested" and the analysis->architecture
|
||||
# advance (human flipped Plane to Approved). Persisted on the task so the
|
||||
# tracker can show "твоё время" without recomputing from activity history.
|
||||
_ensure_column(conn, "tasks", "brd_review_started_at", "TEXT")
|
||||
_ensure_column(conn, "tasks", "brd_review_ended_at", "TEXT")
|
||||
conn.commit()
|
||||
conn.close()
|
||||
|
||||
@@ -131,6 +156,71 @@ def update_task_stage(task_id: int, stage: str):
|
||||
conn.close()
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Telegram live tracker helpers (feat/telegram-live-tracker)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def get_tracker_message_id(task_id: int) -> int | None:
|
||||
"""Return the stored Telegram tracker message_id for a task, or None."""
|
||||
conn = get_db()
|
||||
try:
|
||||
row = conn.execute(
|
||||
"SELECT tracker_message_id FROM tasks WHERE id=?", (task_id,)
|
||||
).fetchone()
|
||||
finally:
|
||||
conn.close()
|
||||
return row[0] if row and row[0] is not None else None
|
||||
|
||||
|
||||
def set_tracker_message_id(task_id: int, message_id: int) -> None:
|
||||
"""Persist the Telegram tracker message_id for a task (idempotent overwrite)."""
|
||||
conn = get_db()
|
||||
try:
|
||||
conn.execute(
|
||||
"UPDATE tasks SET tracker_message_id=? WHERE id=?",
|
||||
(message_id, task_id),
|
||||
)
|
||||
conn.commit()
|
||||
finally:
|
||||
conn.close()
|
||||
|
||||
|
||||
def mark_brd_review_started(task_id: int) -> None:
|
||||
"""Stamp when BRD review (the human approve gate) started, if not already set.
|
||||
|
||||
Idempotent: only sets it the first time (a retried analyst run must not reset
|
||||
the clock). The delta to brd_review_ended_at is the only "твоё время".
|
||||
"""
|
||||
conn = get_db()
|
||||
try:
|
||||
conn.execute(
|
||||
"UPDATE tasks SET brd_review_started_at=datetime('now') "
|
||||
"WHERE id=? AND brd_review_started_at IS NULL",
|
||||
(task_id,),
|
||||
)
|
||||
conn.commit()
|
||||
finally:
|
||||
conn.close()
|
||||
|
||||
|
||||
def mark_brd_review_ended(task_id: int) -> None:
|
||||
"""Stamp when BRD review ended (analysis->architecture advance / Approved).
|
||||
|
||||
Idempotent: only sets it the first time and only if a start exists.
|
||||
"""
|
||||
conn = get_db()
|
||||
try:
|
||||
conn.execute(
|
||||
"UPDATE tasks SET brd_review_ended_at=datetime('now') "
|
||||
"WHERE id=? AND brd_review_started_at IS NOT NULL "
|
||||
"AND brd_review_ended_at IS NULL",
|
||||
(task_id,),
|
||||
)
|
||||
conn.commit()
|
||||
finally:
|
||||
conn.close()
|
||||
|
||||
|
||||
def get_next_work_item_id(repo: str, prefix: str = "ET") -> str:
|
||||
"""Generate next work item ID (e.g., ET-003 / ORCH-001).
|
||||
|
||||
|
||||
@@ -1,6 +1,24 @@
|
||||
"""Notifications and logging for orchestrator events."""
|
||||
"""Notifications and logging for orchestrator events.
|
||||
|
||||
feat/telegram-live-tracker (Variant B+): instead of ~15 separate Telegram
|
||||
messages per task (agent start / finish / stage transition / QG-pending / tech
|
||||
noise), the orchestrator now maintains ONE live tracker message per task that is
|
||||
edited in place (editMessageText) on every stage transition. Only events that
|
||||
NEED Slava's attention are sent as SEPARATE, notifying messages:
|
||||
|
||||
* approve-gate (notify_approve_requested) — BRD/TZ/AC ready, flip to Approved
|
||||
* deploy failed / rolled back — send_telegram from launcher/engine
|
||||
* agent failed (exit_code != 0) — send_telegram from launcher
|
||||
* task error (notify_error)
|
||||
|
||||
The tracker itself is edited SILENTLY (disable_notification: true). Stage-change,
|
||||
agent-start, agent-finish and QG-pending no longer emit their own messages — they
|
||||
just refresh the tracker (or are log-only).
|
||||
"""
|
||||
|
||||
import html
|
||||
import logging
|
||||
|
||||
import httpx
|
||||
|
||||
logger = logging.getLogger("orchestrator")
|
||||
@@ -17,25 +35,115 @@ def _get_settings():
|
||||
return _settings
|
||||
|
||||
|
||||
def send_telegram(text: str):
|
||||
"""Send notification to Telegram. Fire-and-forget, never raises."""
|
||||
# --------------------------------------------------------------------------- #
|
||||
# Low-level Telegram primitives
|
||||
# --------------------------------------------------------------------------- #
|
||||
|
||||
def send_telegram(text: str, disable_notification: bool = False):
|
||||
"""Send a notification to Telegram. Fire-and-forget, never raises.
|
||||
|
||||
Returns the Telegram message_id on success, else None (so callers that want
|
||||
to track the message — the tracker — can store it; legacy callers ignore it).
|
||||
"""
|
||||
s = _get_settings()
|
||||
if not s.telegram_bot_token or not s.telegram_chat_id:
|
||||
return
|
||||
return None
|
||||
try:
|
||||
url = f"https://api.telegram.org/bot{s.telegram_bot_token}/sendMessage"
|
||||
httpx.post(
|
||||
resp = httpx.post(
|
||||
url,
|
||||
json={
|
||||
"chat_id": s.telegram_chat_id,
|
||||
"text": text,
|
||||
"parse_mode": "HTML",
|
||||
"disable_notification": False,
|
||||
"disable_notification": disable_notification,
|
||||
},
|
||||
timeout=5,
|
||||
)
|
||||
data = resp.json()
|
||||
if data.get("ok"):
|
||||
return data["result"]["message_id"]
|
||||
except Exception:
|
||||
pass # Never crash orchestrator due to notification failure
|
||||
return None
|
||||
|
||||
|
||||
# edit_telegram outcome codes -> let update_task_tracker decide what to do:
|
||||
# "ok" edit applied -> nothing else to do
|
||||
# "not_modified" Telegram says text is identical (400 "message is not
|
||||
# modified" / "exactly the same") -> success, NO new message
|
||||
# "gone" original message can't be edited (deleted / too old /
|
||||
# invalid id) -> caller must fall back to a NEW message
|
||||
# "failed" transient failure (network / timeout / 5xx / unknown 400)
|
||||
# -> caller must NOT send a new message (avoid duplicates)
|
||||
EDIT_OK = "ok"
|
||||
EDIT_NOT_MODIFIED = "not_modified"
|
||||
EDIT_GONE = "gone"
|
||||
EDIT_FAILED = "failed"
|
||||
|
||||
# Telegram error descriptions that mean the message is permanently un-editable
|
||||
# (it is gone / orphaned) -> fall back to a fresh message.
|
||||
_GONE_MARKERS = (
|
||||
"message to edit not found",
|
||||
"message can't be edited",
|
||||
"message_id_invalid",
|
||||
)
|
||||
# Telegram "nothing changed" -> treat as success, never a duplicate.
|
||||
_NOT_MODIFIED_MARKERS = (
|
||||
"message is not modified",
|
||||
"exactly the same",
|
||||
)
|
||||
|
||||
|
||||
def edit_telegram(message_id: int, text: str) -> str:
|
||||
"""Edit an existing Telegram message. Never raises.
|
||||
|
||||
Returns a distinguishable outcome (see EDIT_* constants) so the caller can
|
||||
tell apart "all good" / "nothing changed" / "message gone" / "transient
|
||||
failure" and only fall back to a NEW message when the original is truly gone.
|
||||
"""
|
||||
s = _get_settings()
|
||||
if not s.telegram_bot_token or not s.telegram_chat_id:
|
||||
return EDIT_FAILED
|
||||
try:
|
||||
url = f"https://api.telegram.org/bot{s.telegram_bot_token}/editMessageText"
|
||||
resp = httpx.post(
|
||||
url,
|
||||
json={
|
||||
"chat_id": s.telegram_chat_id,
|
||||
"message_id": message_id,
|
||||
"text": text,
|
||||
"parse_mode": "HTML",
|
||||
},
|
||||
timeout=5,
|
||||
)
|
||||
data = resp.json()
|
||||
if data.get("ok"):
|
||||
return EDIT_OK
|
||||
# ok:false -> inspect the description to classify the 400.
|
||||
desc = str(data.get("description") or "").lower()
|
||||
if any(m in desc for m in _NOT_MODIFIED_MARKERS):
|
||||
# Text is identical between transitions (e.g. repeat review cycle
|
||||
# renders the same line). Nothing to do, NOT a duplicate.
|
||||
logger.debug(
|
||||
f"edit_telegram(mid={message_id}): not modified, skipping"
|
||||
)
|
||||
return EDIT_NOT_MODIFIED
|
||||
if any(m in desc for m in _GONE_MARKERS):
|
||||
logger.warning(
|
||||
f"edit_telegram(mid={message_id}): message gone ({desc!r}), "
|
||||
f"will fall back to a new message"
|
||||
)
|
||||
return EDIT_GONE
|
||||
# Unknown 400 / other non-ok -> transient/unknown, do NOT duplicate.
|
||||
logger.warning(
|
||||
f"edit_telegram(mid={message_id}): edit failed ({desc!r})"
|
||||
)
|
||||
return EDIT_FAILED
|
||||
except Exception as e:
|
||||
# Network / timeout / 5xx -> transient, do NOT duplicate.
|
||||
logger.warning(f"edit_telegram(mid={message_id}): transient error: {e}")
|
||||
return EDIT_FAILED
|
||||
|
||||
|
||||
def _get_work_item_id(task_id: int) -> str:
|
||||
@@ -50,26 +158,355 @@ def _get_work_item_id(task_id: int) -> str:
|
||||
return f"task-{task_id}"
|
||||
|
||||
|
||||
# --------------------------------------------------------------------------- #
|
||||
# Live task tracker
|
||||
# --------------------------------------------------------------------------- #
|
||||
|
||||
# Pipeline stages shown in the tracker, in order, with their display label and
|
||||
# the agent whose agent_runs rows describe that stage's work. "Ревью БРД" is NOT
|
||||
# an agent stage — it is the human approve gate rendered between Analysis and
|
||||
# Architecture from the task's brd_review_* timestamps.
|
||||
_TRACKER_STAGES = [
|
||||
("analysis", "Analysis", "analyst"),
|
||||
("architecture", "Architecture", "architect"),
|
||||
("development", "Development", "developer"),
|
||||
("review", "Review", "reviewer"),
|
||||
("testing", "Testing", "tester"),
|
||||
("deploy", "Deploy", "deployer"),
|
||||
]
|
||||
|
||||
# Map a pipeline stage -> the agent that is RUNNING while the task sits in it.
|
||||
# (development is entered after architecture finishes, etc.) Used to render the
|
||||
# "🔄 <Stage> … идёт" line for the currently-active stage.
|
||||
_BRD_LABEL = "\u0420\u0435\u0432\u044c\u044e \u0411\u0420\u0414" # "Ревью БРД"
|
||||
|
||||
_STAGE_ACTIVE_AGENT = {
|
||||
"analysis": "analyst",
|
||||
"architecture": "architect",
|
||||
"development": "developer",
|
||||
"review": "reviewer",
|
||||
"testing": "tester",
|
||||
"deploy": "deployer",
|
||||
}
|
||||
|
||||
|
||||
def _fmt_minutes(seconds) -> str:
|
||||
"""Render a duration in whole minutes: 0..59s -> '<1м', else '<n>м'."""
|
||||
try:
|
||||
seconds = int(seconds or 0)
|
||||
except (TypeError, ValueError):
|
||||
seconds = 0
|
||||
if seconds <= 0:
|
||||
return "0м"
|
||||
if seconds < 60:
|
||||
return "<1м"
|
||||
return f"{seconds // 60}\u043c"
|
||||
|
||||
|
||||
def _parse_sql_ts(ts):
|
||||
"""Parse a SQLite 'YYYY-MM-DD HH:MM:SS' UTC timestamp -> aware datetime/None."""
|
||||
if not ts:
|
||||
return None
|
||||
from datetime import datetime, timezone
|
||||
for fmt in ("%Y-%m-%d %H:%M:%S", "%Y-%m-%dT%H:%M:%S"):
|
||||
try:
|
||||
return datetime.strptime(str(ts)[:19], fmt).replace(tzinfo=timezone.utc)
|
||||
except (ValueError, TypeError):
|
||||
continue
|
||||
return None
|
||||
|
||||
|
||||
def _duration_seconds(started, finished):
|
||||
"""Seconds between two SQL timestamps; None if either is missing/unparseable."""
|
||||
a = _parse_sql_ts(started)
|
||||
b = _parse_sql_ts(finished)
|
||||
if a is None or b is None:
|
||||
return None
|
||||
return max(int((b - a).total_seconds()), 0)
|
||||
|
||||
|
||||
def render_task_tracker(task_id: int) -> str:
|
||||
"""Build the full live-tracker text for a task from the DB (stateless render).
|
||||
|
||||
Pulls the task header (work_item_id, title, stage), every agent_runs row, and
|
||||
the BRD-review timestamps, then renders:
|
||||
- one '✅ <Stage> <dur> · <in>↓/<out>↑ · <cost> · <model>' line per finished
|
||||
stage (latest run per stage),
|
||||
- the '⏸️ Ревью БРД <dur> · твоё время[ ⏳]' line between Analysis/Architecture,
|
||||
- a '🔄 <Stage> … идёт' line for the active (in-progress) stage,
|
||||
- the '💰 <in>↓ / <out>↑ · <cost>' totals,
|
||||
- on done: '⏱️ Всего .. · агенты .. · твоё ..' and a '🔗 PR / 📦' line.
|
||||
|
||||
Never raises (returns a minimal fallback string on error).
|
||||
"""
|
||||
from .db import get_db
|
||||
from .usage import fmt_tokens, fmt_cost, _input_total, short_model_name
|
||||
|
||||
try:
|
||||
conn = get_db()
|
||||
task = conn.execute(
|
||||
"SELECT id, work_item_id, title, stage, created_at, updated_at, "
|
||||
"brd_review_started_at, brd_review_ended_at "
|
||||
"FROM tasks WHERE id=?",
|
||||
(task_id,),
|
||||
).fetchone()
|
||||
if not task:
|
||||
conn.close()
|
||||
return f"task-{task_id}"
|
||||
runs = conn.execute(
|
||||
"SELECT agent, started_at, finished_at, exit_code, input_tokens, "
|
||||
"output_tokens, cache_read_tokens, cache_creation_tokens, cost_usd, model "
|
||||
"FROM agent_runs WHERE task_id=? ORDER BY id ASC",
|
||||
(task_id,),
|
||||
).fetchall()
|
||||
conn.close()
|
||||
except Exception as e:
|
||||
logger.warning(f"render_task_tracker({task_id}) DB error: {e}")
|
||||
return f"task-{task_id}"
|
||||
|
||||
work_item_id = task["work_item_id"] or f"task-{task_id}"
|
||||
title = task["title"] or work_item_id
|
||||
stage = task["stage"] or "created"
|
||||
done = stage == "done"
|
||||
|
||||
# Latest completed run per agent (a stage may have multiple runs on retry;
|
||||
# we show the most recent FINISHED, successful run for the stage line).
|
||||
last_done = {}
|
||||
agent_runs_by_agent = {}
|
||||
for r in runs:
|
||||
agent_runs_by_agent.setdefault(r["agent"], []).append(r)
|
||||
if r["finished_at"] and (r["exit_code"] == 0 or r["exit_code"] is None):
|
||||
last_done[r["agent"]] = r
|
||||
|
||||
# Totals across ALL runs (every input/output token + cost counts).
|
||||
total_in = 0
|
||||
total_out = 0
|
||||
total_cost = 0.0
|
||||
agent_seconds = 0
|
||||
for r in runs:
|
||||
usage = {
|
||||
"input_tokens": r["input_tokens"],
|
||||
"cache_read_tokens": r["cache_read_tokens"],
|
||||
"cache_creation_tokens": r["cache_creation_tokens"],
|
||||
}
|
||||
total_in += _input_total(usage)
|
||||
total_out += int(r["output_tokens"] or 0)
|
||||
total_cost += float(r["cost_usd"] or 0.0)
|
||||
d = _duration_seconds(r["started_at"], r["finished_at"])
|
||||
if d is not None:
|
||||
agent_seconds += d
|
||||
|
||||
esc_title = html.escape(title)
|
||||
header = (
|
||||
f"\U0001f389 {html.escape(work_item_id)} \u00b7 {esc_title} \u2014 \u0413\u041e\u0422\u041e\u0412\u041e"
|
||||
if done
|
||||
else f"\U0001f6e0\ufe0f {html.escape(work_item_id)} \u00b7 {esc_title}"
|
||||
)
|
||||
bar = "\u2501" * 22
|
||||
lines = [header, bar]
|
||||
|
||||
def _stage_line(label, run):
|
||||
usage = {
|
||||
"input_tokens": run["input_tokens"],
|
||||
"cache_read_tokens": run["cache_read_tokens"],
|
||||
"cache_creation_tokens": run["cache_creation_tokens"],
|
||||
}
|
||||
in_tok = fmt_tokens(_input_total(usage))
|
||||
out_tok = fmt_tokens(run["output_tokens"])
|
||||
cost = fmt_cost(run["cost_usd"])
|
||||
dur = _fmt_minutes(_duration_seconds(run["started_at"], run["finished_at"]))
|
||||
model = short_model_name(run["model"])
|
||||
model_suffix = f" \u00b7 {model}" if model else ""
|
||||
return (
|
||||
f"\u2705 {label:<13} {dur} \u00b7 "
|
||||
f"{in_tok}\u2193/{out_tok}\u2191 \u00b7 {cost}{model_suffix}"
|
||||
)
|
||||
|
||||
# BRD review line: between Analysis and Architecture, only once Analysis has
|
||||
# produced a run (i.e. the gate is live). Time = human review delta.
|
||||
brd_started = task["brd_review_started_at"]
|
||||
brd_ended = task["brd_review_ended_at"]
|
||||
review_seconds = _duration_seconds(brd_started, brd_ended)
|
||||
|
||||
for stage_key, label, agent in _TRACKER_STAGES:
|
||||
run = last_done.get(agent)
|
||||
# The stage is "in progress" only when it is the task's current stage AND
|
||||
# there is an unfinished run for its agent (the agent is actually still
|
||||
# working). A finished run with no in-flight run -> show the \u2705 result,
|
||||
# even if the task still sits in that stage (just-finished snapshot).
|
||||
agent_runs = agent_runs_by_agent.get(agent, [])
|
||||
has_inflight = any(ar["finished_at"] is None for ar in agent_runs)
|
||||
is_active_stage = (
|
||||
_STAGE_ACTIVE_AGENT.get(stage) == agent
|
||||
and stage == stage_key
|
||||
and (has_inflight or run is None)
|
||||
)
|
||||
if is_active_stage:
|
||||
# Live "\U0001f504 ... \u0438\u0434\u0451\u0442" line. Count how many times THIS stage's
|
||||
# agent has run for this task; a 2nd+ run means we're re-doing the
|
||||
# stage (e.g. review->development->review), so show "\u043f\u043e\u043f\u044b\u0442\u043a\u0430 N"
|
||||
# to make the text change between cycles and to honestly show Slava
|
||||
# the stage is being re-worked.
|
||||
attempt = len(agent_runs)
|
||||
if attempt >= 2:
|
||||
lines.append(
|
||||
f"\U0001f504 {label} \u00b7 \u043f\u043e\u043f\u044b\u0442\u043a\u0430 {attempt} "
|
||||
f"\u2026 \u0438\u0434\u0451\u0442"
|
||||
)
|
||||
else:
|
||||
lines.append(
|
||||
f"\U0001f504 {label:<13} \u2026 \u00b7 \u0438\u0434\u0451\u0442"
|
||||
)
|
||||
elif run is not None:
|
||||
lines.append(_stage_line(label, run))
|
||||
# else: not started yet -> not shown.
|
||||
|
||||
# Insert the BRD review line right after Analysis.
|
||||
if stage_key == "analysis" and brd_started:
|
||||
brd_label = f"{_BRD_LABEL:<13}"
|
||||
if review_seconds is not None:
|
||||
dur = _fmt_minutes(review_seconds)
|
||||
lines.append(
|
||||
f"\u23f8\ufe0f {brd_label} {dur} \u00b7 \u0442\u0432\u043e\u0451 \u0432\u0440\u0435\u043c\u044f"
|
||||
)
|
||||
else:
|
||||
# Still waiting on the human (ended not stamped yet).
|
||||
from datetime import datetime, timezone
|
||||
start_dt = _parse_sql_ts(brd_started)
|
||||
waited = None
|
||||
if start_dt is not None:
|
||||
waited = int(
|
||||
(datetime.now(timezone.utc) - start_dt).total_seconds()
|
||||
)
|
||||
dur = _fmt_minutes(waited) if waited is not None else "\u2026"
|
||||
lines.append(
|
||||
f"\u23f8\ufe0f {brd_label} {dur} \u00b7 \u0442\u0432\u043e\u0451 \u0432\u0440\u0435\u043c\u044f \u23f3"
|
||||
)
|
||||
|
||||
lines.append(bar)
|
||||
lines.append(
|
||||
f"\U0001f4b0 {fmt_tokens(total_in)}\u2193 / {fmt_tokens(total_out)}\u2191 \u00b7 "
|
||||
f"{fmt_cost(total_cost)}"
|
||||
)
|
||||
|
||||
if done:
|
||||
wall = _duration_seconds(task["created_at"], task["updated_at"])
|
||||
wall_str = _fmt_minutes(wall) if wall is not None else "?"
|
||||
review_str = _fmt_minutes(review_seconds) if review_seconds else "0м"
|
||||
lines.append(
|
||||
f"\u23f1\ufe0f \u0412\u0441\u0435\u0433\u043e {wall_str} \u00b7 "
|
||||
f"\u0430\u0433\u0435\u043d\u0442\u044b {_fmt_minutes(agent_seconds)} \u00b7 "
|
||||
f"\u0442\u0432\u043e\u0451 {review_str}"
|
||||
)
|
||||
link = _done_link(task_id, task["work_item_id"])
|
||||
if link:
|
||||
lines.append(link)
|
||||
|
||||
return "\n".join(lines)
|
||||
|
||||
|
||||
def _done_link(task_id: int, work_item_id) -> str | None:
|
||||
"""Build the final '🔗 PR #n · 📦 deployed' line. Never raises -> None."""
|
||||
try:
|
||||
from .config import settings
|
||||
from .db import get_db
|
||||
conn = get_db()
|
||||
row = conn.execute(
|
||||
"SELECT repo, branch FROM tasks WHERE id=?", (task_id,)
|
||||
).fetchone()
|
||||
conn.close()
|
||||
if not row:
|
||||
return None
|
||||
repo, branch = row["repo"], row["branch"]
|
||||
pr_part = None
|
||||
try:
|
||||
owner = settings.gitea_owner
|
||||
headers = {"Authorization": f"token {settings.gitea_token}"}
|
||||
resp = httpx.get(
|
||||
f"{settings.gitea_url}/api/v1/repos/{owner}/{repo}/pulls",
|
||||
params={"state": "all", "head": branch},
|
||||
headers=headers, timeout=5,
|
||||
)
|
||||
if resp.status_code == 200:
|
||||
prs = resp.json()
|
||||
if prs:
|
||||
pr_part = f"\U0001f517 PR #{prs[0].get('number')}"
|
||||
except Exception:
|
||||
pr_part = None
|
||||
parts = []
|
||||
if pr_part:
|
||||
parts.append(pr_part)
|
||||
parts.append("\U0001f4e6 deployed")
|
||||
return " \u00b7 ".join(parts)
|
||||
except Exception:
|
||||
return None
|
||||
|
||||
|
||||
def update_task_tracker(task_id: int):
|
||||
"""Render + push the live tracker for a task. Never raises.
|
||||
|
||||
First call (no stored tracker_message_id): sendMessage (silent) and store the
|
||||
returned message_id. Subsequent calls: editMessageText the stored message.
|
||||
A NEW message is sent ONLY when the original is truly gone (deleted / too old
|
||||
/ invalid id). On "not modified" (text unchanged) or transient failures
|
||||
(network / timeout / 5xx / unknown 400) we do NOT send a new message — that
|
||||
is exactly what produced duplicate trackers and orphaned (lagging) messages.
|
||||
The tracker is always sent with disable_notification so it never pings —
|
||||
only the dedicated alert helpers ping.
|
||||
"""
|
||||
try:
|
||||
from .db import get_tracker_message_id, set_tracker_message_id
|
||||
text = render_task_tracker(task_id)
|
||||
mid = get_tracker_message_id(task_id)
|
||||
if mid is not None:
|
||||
result = edit_telegram(mid, text)
|
||||
if result in (EDIT_OK, EDIT_NOT_MODIFIED):
|
||||
# Edited in place (or nothing to change) -> done, no duplicate.
|
||||
return
|
||||
if result == EDIT_FAILED:
|
||||
# Transient -> don't duplicate; tracker redraws next transition.
|
||||
logger.debug(
|
||||
f"update_task_tracker({task_id}): edit failed transiently, "
|
||||
f"keeping message {mid}"
|
||||
)
|
||||
return
|
||||
# result == EDIT_GONE -> the stored message is gone; fall through
|
||||
# to send a fresh one and re-point tracker_message_id at it.
|
||||
new_mid = send_telegram(text, disable_notification=True)
|
||||
if new_mid is not None:
|
||||
set_tracker_message_id(task_id, new_mid)
|
||||
except Exception as e:
|
||||
logger.warning(f"update_task_tracker({task_id}) failed: {e}")
|
||||
|
||||
|
||||
# --------------------------------------------------------------------------- #
|
||||
# Stage / agent lifecycle notifications (now tracker-only, no separate message)
|
||||
# --------------------------------------------------------------------------- #
|
||||
|
||||
def notify_stage_change(task_id: int, old_stage: str, new_stage: str, agent: str = None):
|
||||
"""Log and notify stage transition."""
|
||||
"""Log a stage transition and refresh the live tracker (no separate message)."""
|
||||
work_item_id = _get_work_item_id(task_id)
|
||||
msg = f"\U0001f504 {work_item_id}: {old_stage} \u2192 {new_stage}"
|
||||
if agent:
|
||||
msg += f" (\u0437\u0430\u043f\u0443\u0449\u0435\u043d {agent})"
|
||||
logger.info(msg)
|
||||
send_telegram(msg)
|
||||
update_task_tracker(task_id)
|
||||
|
||||
|
||||
def notify_agent_started(run_id: int, agent: str, task_id: int):
|
||||
"""Notify agent launch."""
|
||||
"""Log an agent launch and refresh the tracker (no separate message)."""
|
||||
work_item_id = _get_work_item_id(task_id)
|
||||
msg = f"\U0001f680 {work_item_id}: {agent} \u0437\u0430\u043f\u0443\u0449\u0435\u043d (run_id={run_id})"
|
||||
logger.info(msg)
|
||||
send_telegram(msg)
|
||||
logger.info(f"\U0001f680 {work_item_id}: {agent} \u0437\u0430\u043f\u0443\u0449\u0435\u043d (run_id={run_id})")
|
||||
if task_id:
|
||||
update_task_tracker(task_id)
|
||||
|
||||
|
||||
def notify_agent_finished(run_id: int, agent: str, exit_code: int, task_id: int = None, duration_s: int = None):
|
||||
"""Notify agent completion."""
|
||||
"""Log agent completion and refresh the tracker (no separate message).
|
||||
|
||||
The agent-FAILED alert (exit_code != 0) is still sent separately by the
|
||||
launcher via send_telegram; this helper itself only logs + refreshes.
|
||||
"""
|
||||
work_item_id = _get_work_item_id(task_id) if task_id else "?"
|
||||
if exit_code == 0:
|
||||
dur = f" ({duration_s // 60} \u043c\u0438\u043d)" if duration_s else ""
|
||||
@@ -79,47 +516,66 @@ def notify_agent_finished(run_id: int, agent: str, exit_code: int, task_id: int
|
||||
else:
|
||||
msg = f"\u274c {work_item_id}: {agent} \u0443\u043f\u0430\u043b (exit_code={exit_code})"
|
||||
logger.info(msg)
|
||||
send_telegram(msg)
|
||||
if task_id:
|
||||
update_task_tracker(task_id)
|
||||
|
||||
|
||||
def notify_qg_result(task_id: int, check: str, passed: bool, reason: str = None):
|
||||
"""Notify QG check result."""
|
||||
"""Log a QG check result (NO separate Telegram message: QG-pending is noise).
|
||||
|
||||
Kept for callers; QG outcomes are log-only now and reflected by the tracker
|
||||
through the resulting stage transition.
|
||||
"""
|
||||
work_item_id = _get_work_item_id(task_id)
|
||||
if passed:
|
||||
msg = f"\u2705 {work_item_id}: QG {check} \u2014 passed"
|
||||
logger.info(f"\u2705 {work_item_id}: QG {check} \u2014 passed")
|
||||
else:
|
||||
msg = f"\u26a0\ufe0f {work_item_id}: QG {check} \u2014 failed: {reason}"
|
||||
logger.info(msg)
|
||||
send_telegram(msg)
|
||||
logger.warning(f"\u26a0\ufe0f {work_item_id}: QG {check} \u2014 failed: {reason}")
|
||||
|
||||
|
||||
def notify_qg_failure(task_id: int, stage: str, check: str, reason: str):
|
||||
"""Log and notify QG check failure."""
|
||||
"""Log a QG check failure (log-only).
|
||||
|
||||
QG-pending / QG-failed are NOT pinged as separate messages anymore (they are
|
||||
not actionable for Slava). Real rollbacks/deploy-fails are alerted by their
|
||||
own dedicated send_telegram calls in the engine/launcher.
|
||||
"""
|
||||
work_item_id = _get_work_item_id(task_id)
|
||||
msg = f"\u26a0\ufe0f {work_item_id}: QG {check} \u2014 failed: {reason}"
|
||||
logger.warning(msg)
|
||||
send_telegram(msg)
|
||||
logger.warning(f"\u26a0\ufe0f {work_item_id}: QG {check} \u2014 failed: {reason}")
|
||||
|
||||
|
||||
def notify_approve_requested(task_id: int):
|
||||
"""Notify that analyst requests :approved:."""
|
||||
"""ALERT (separate, notifying): BRD/TZ/AC ready -> flip Plane to Approved.
|
||||
|
||||
Also starts the BRD-review clock and refreshes the tracker so the
|
||||
'⏸️ Ревью БРД · твоё время ⏳' line appears.
|
||||
"""
|
||||
work_item_id = _get_work_item_id(task_id)
|
||||
msg = f"\U0001f4cb {work_item_id}: BRD/\u0422\u0417/AC \u0433\u043e\u0442\u043e\u0432\u044b. \u0416\u0434\u0443 :approved: \u0432 Plane"
|
||||
try:
|
||||
from .db import mark_brd_review_started
|
||||
mark_brd_review_started(task_id)
|
||||
except Exception as e:
|
||||
logger.warning(f"notify_approve_requested: brd clock start failed: {e}")
|
||||
msg = (
|
||||
f"\U0001f4cb {work_item_id}: BRD/\u0422\u0417/AC \u0433\u043e\u0442\u043e\u0432\u044b. "
|
||||
f"\u041f\u0435\u0440\u0435\u0432\u0435\u0434\u0438\u0442\u0435 \u0437\u0430\u0434\u0430\u0447\u0443 \u0432 \u0441\u0442\u0430\u0442\u0443\u0441 Approved "
|
||||
f"\u0432 Plane \u0434\u043b\u044f \u043f\u0440\u043e\u0434\u043e\u043b\u0436\u0435\u043d\u0438\u044f."
|
||||
)
|
||||
logger.info(msg)
|
||||
send_telegram(msg)
|
||||
update_task_tracker(task_id)
|
||||
send_telegram(msg) # separate, notifying
|
||||
|
||||
|
||||
def notify_done(task_id: int):
|
||||
"""Notify task completion."""
|
||||
"""Task completion: refresh the tracker to its final ГОТОВО form (no separate ping)."""
|
||||
work_item_id = _get_work_item_id(task_id)
|
||||
msg = f"\U0001f389 {work_item_id}: \u0437\u0430\u0434\u0430\u0447\u0430 \u0437\u0430\u0432\u0435\u0440\u0448\u0435\u043d\u0430!"
|
||||
logger.info(msg)
|
||||
send_telegram(msg)
|
||||
logger.info(f"\U0001f389 {work_item_id}: \u0437\u0430\u0434\u0430\u0447\u0430 \u0437\u0430\u0432\u0435\u0440\u0448\u0435\u043d\u0430!")
|
||||
update_task_tracker(task_id)
|
||||
|
||||
|
||||
def notify_error(task_id: int, error: str):
|
||||
"""Log and notify error for a task."""
|
||||
"""ALERT (separate, notifying): task error."""
|
||||
work_item_id = _get_work_item_id(task_id) if task_id else "system"
|
||||
msg = f"\U0001f534 {work_item_id}: ERROR \u2014 {error}"
|
||||
logger.error(msg)
|
||||
send_telegram(msg)
|
||||
send_telegram(msg) # separate, notifying
|
||||
|
||||
@@ -343,6 +343,17 @@ def set_issue_blocked(work_item_id: str, project_id: str = None):
|
||||
_set_issue_state_direct(work_item_id, PLANE_STATES["blocked"], project_id)
|
||||
|
||||
|
||||
def set_issue_done(work_item_id: str, project_id: str = None):
|
||||
"""Observability fix: force the issue into the TERMINAL Done state.
|
||||
|
||||
Used by the deploy->done success path so a completed task always reaches the
|
||||
terminal Plane state (it used to stick on In Progress because the merge
|
||||
webhook bypassed the stage engine). Uses the existing PLANE_STATES['done']
|
||||
UUID — the mapping itself is NOT changed.
|
||||
"""
|
||||
_set_issue_state_direct(work_item_id, PLANE_STATES["done"], project_id)
|
||||
|
||||
|
||||
def set_issue_in_progress(work_item_id: str, project_id: str = None):
|
||||
"""Set issue to 'In Progress' state — agent working."""
|
||||
_set_issue_state_direct(work_item_id, PLANE_STATES["in_progress"], project_id)
|
||||
|
||||
@@ -281,6 +281,44 @@ def check_tests_local(repo: str, branch: str) -> tuple[bool, str]:
|
||||
return False, f"Local test run error: {e}"
|
||||
|
||||
|
||||
def check_deploy_status(repo: str, work_item_id: str, branch: str | None = None) -> tuple[bool, str]:
|
||||
"""
|
||||
БАГ 8 fix: gate the deploy -> done transition on the deployer's machine-readable
|
||||
verdict in 14-deploy-log.md frontmatter, NOT on the LLM process exit code
|
||||
(which is always 0 on a successful agent session even when the deploy failed).
|
||||
|
||||
Mirrors check_reviewer_verdict (S-5): reads ONLY `deploy_status:` from YAML
|
||||
frontmatter. Returns:
|
||||
(True, ...) -> deploy_status: SUCCESS
|
||||
(False, ...) -> deploy_status: FAILED, missing field, or no frontmatter
|
||||
"""
|
||||
import yaml
|
||||
repo_path = _repo_path(repo, branch)
|
||||
log_path = os.path.join(repo_path, f"docs/work-items/{work_item_id}/14-deploy-log.md")
|
||||
|
||||
if not os.path.isfile(log_path):
|
||||
return False, "Deploy log not found (14-deploy-log.md)"
|
||||
try:
|
||||
with open(log_path, "r") as f:
|
||||
content = f.read()
|
||||
status = None
|
||||
if content.startswith("---"):
|
||||
parts = content.split("---", 2)
|
||||
if len(parts) >= 3:
|
||||
try:
|
||||
fm = yaml.safe_load(parts[1]) or {}
|
||||
except yaml.YAMLError as e:
|
||||
return False, f"Invalid YAML frontmatter in deploy log: {e}"
|
||||
status = str(fm.get("deploy_status", "")).upper().strip()
|
||||
if status == "SUCCESS":
|
||||
return True, "Deploy status: SUCCESS"
|
||||
if status == "FAILED":
|
||||
return False, "Deploy status: FAILED"
|
||||
return False, f"No machine-readable deploy_status in frontmatter (got: {status!r})"
|
||||
except OSError as e:
|
||||
return False, f"Error reading deploy log: {e}"
|
||||
|
||||
|
||||
# Registry for dynamic lookup by name
|
||||
QG_CHECKS = {
|
||||
"check_analysis_approved": check_analysis_approved,
|
||||
@@ -291,4 +329,5 @@ QG_CHECKS = {
|
||||
"check_tests_passed": check_tests_passed,
|
||||
"check_reviewer_verdict": check_reviewer_verdict,
|
||||
"check_tests_local": check_tests_local,
|
||||
"check_deploy_status": check_deploy_status,
|
||||
}
|
||||
|
||||
@@ -47,6 +47,7 @@ from .plane_sync import (
|
||||
set_issue_needs_input,
|
||||
set_issue_in_progress,
|
||||
set_issue_blocked,
|
||||
set_issue_done,
|
||||
)
|
||||
from .config import settings
|
||||
|
||||
@@ -239,6 +240,15 @@ def advance_stage(
|
||||
|
||||
# --- Advance ---------------------------------------------------------
|
||||
update_task_stage(task_id, next_stage)
|
||||
# Telegram live tracker: the analysis->architecture advance is the human
|
||||
# Approved gate clearing -> stamp the END of "Ревью БРД" (the only
|
||||
# human time). Idempotent: only the first stamp counts.
|
||||
if current_stage == "analysis" and next_stage == "architecture":
|
||||
try:
|
||||
from .db import mark_brd_review_ended
|
||||
mark_brd_review_ended(task_id)
|
||||
except Exception as e:
|
||||
logger.warning(f"Task {task_id}: brd review end stamp failed: {e}")
|
||||
notify_stage_change(task_id, current_stage, next_stage)
|
||||
plane_notify_stage(work_item_id, current_stage, next_stage)
|
||||
result.advanced = True
|
||||
@@ -247,6 +257,22 @@ def advance_stage(
|
||||
f"(auto-advance after {agent})"
|
||||
)
|
||||
|
||||
# --- Terminal sync: deploy -> done must reach Plane's Done -----------
|
||||
# When the deployer's check_deploy_status passes we advance to the
|
||||
# terminal 'done' stage. Previously a merged-PR webhook completed the
|
||||
# task out-of-band and Plane stuck on In Progress. Now done flows through
|
||||
# here, so explicitly drive the Plane issue into the terminal Done state
|
||||
# (PLANE_STATES['done'] — mapping unchanged) in addition to the
|
||||
# stage-change comment above.
|
||||
if next_stage == "done" and work_item_id:
|
||||
try:
|
||||
set_issue_done(work_item_id)
|
||||
logger.info(
|
||||
f"Task {task_id}: deploy->done, Plane state forced to Done"
|
||||
)
|
||||
except Exception as e:
|
||||
logger.error(f"Task {task_id}: failed to set Plane Done: {e}")
|
||||
|
||||
# --- Launch the next agent (ORCH-4 fix: current_stage, not next) -----
|
||||
next_agent = get_agent_for_stage(current_stage)
|
||||
if next_agent:
|
||||
@@ -490,3 +516,31 @@ def _handle_qg_failure_rollbacks(
|
||||
f"Task {task_id}: architect conflict, enqueued analyst "
|
||||
f"(job_id={new_job})"
|
||||
)
|
||||
|
||||
# БАГ 8: deployer verdict FAILED -> roll deploy back to development.
|
||||
# The launcher's exit_code-based guard (launcher.py:475) never fires because
|
||||
# the LLM process exit code is always 0; this gate fires on the machine-readable
|
||||
# deploy_status verdict in 14-deploy-log.md instead. Mirrors the launcher block
|
||||
# (rollback + set_issue_blocked + notify) but is driven by the VERDICT.
|
||||
if agent == "deployer" and qg_name == "check_deploy_status":
|
||||
update_task_stage(task_id, "development")
|
||||
notify_stage_change(task_id, current_stage, "development")
|
||||
plane_notify_stage(work_item_id, current_stage, "development")
|
||||
result.rolled_back_to = "development"
|
||||
set_issue_blocked(work_item_id)
|
||||
notify_qg_failure(task_id, "deploy", "check_deploy_status", reason)
|
||||
plane_add_comment(
|
||||
work_item_id,
|
||||
f"\u274c Deploy FAILED ({reason}). Rolled back to development. "
|
||||
f"Developer \u043d\u0443\u0436\u0435\u043d \u0434\u043b\u044f \u0444\u0438\u043a\u0441\u0430.",
|
||||
author="deployer",
|
||||
)
|
||||
send_telegram(
|
||||
f"\U0001f6a8 {work_item_id}: Deploy FAILED ({reason}). "
|
||||
f"Rolled back to development. Needs fix."
|
||||
)
|
||||
result.alerted = True
|
||||
logger.error(
|
||||
f"Task {task_id}: deployer verdict FAILED, rolled back deploy -> "
|
||||
f"development ({reason})"
|
||||
)
|
||||
|
||||
@@ -16,7 +16,7 @@ STAGE_TRANSITIONS = {
|
||||
"development": {"next": "review", "agent": "reviewer", "qg": "check_ci_green"},
|
||||
"review": {"next": "testing", "agent": "tester", "qg": "check_reviewer_verdict"},
|
||||
"testing": {"next": "deploy", "agent": "deployer", "qg": "check_tests_passed"},
|
||||
"deploy": {"next": "done", "agent": None, "qg": None},
|
||||
"deploy": {"next": "done", "agent": None, "qg": "check_deploy_status"},
|
||||
"done": {"next": None, "agent": None, "qg": None},
|
||||
}
|
||||
|
||||
|
||||
224
src/usage.py
224
src/usage.py
@@ -31,7 +31,8 @@ def parse_usage_from_text(text: str) -> dict | None:
|
||||
top-level '{' ... '}' that parses and carries usage/total_cost_usd.
|
||||
|
||||
Returns a normalised dict
|
||||
{input_tokens, output_tokens, cache_read_tokens, cost_usd}
|
||||
{input_tokens, output_tokens, cache_read_tokens, cache_creation_tokens,
|
||||
cost_usd}
|
||||
(ints / float, missing fields -> 0 / 0.0), or None if no usable JSON found.
|
||||
"""
|
||||
if not text:
|
||||
@@ -71,10 +72,67 @@ def parse_usage_from_text(text: str) -> dict | None:
|
||||
"cache_read_tokens": _int(
|
||||
usage.get("cache_read_input_tokens", usage.get("cache_read_tokens"))
|
||||
),
|
||||
# The cache-CREATION slice (writing new cache entries) is part of the
|
||||
# REAL input and used to be dropped on the floor. Persist it so the
|
||||
# "X in" figure reflects the full prompt size, not just fresh tokens.
|
||||
"cache_creation_tokens": _int(
|
||||
usage.get("cache_creation_input_tokens", usage.get("cache_creation_tokens"))
|
||||
),
|
||||
"cost_usd": _float(cost),
|
||||
# Telegram live tracker: the model the run actually used. claude
|
||||
# --output-format json reports it under modelUsage (a dict keyed by the
|
||||
# full model id) and/or a top-level "model" field. We keep the FULL name
|
||||
# here; short_model_name() trims it for the tracker. None when unknown.
|
||||
"model": _extract_model(candidate),
|
||||
}
|
||||
|
||||
|
||||
def _extract_model(candidate: dict) -> str | None:
|
||||
"""Best-effort: pull the model id out of a claude result JSON object.
|
||||
|
||||
Prefers modelUsage (a dict keyed by full model ids, e.g.
|
||||
{"claude-opus-4-8": {...}}) and returns the key with the most output
|
||||
tokens; falls back to a top-level "model" string. Never raises -> None.
|
||||
"""
|
||||
try:
|
||||
mu = candidate.get("modelUsage")
|
||||
if isinstance(mu, dict) and mu:
|
||||
def _out(v):
|
||||
try:
|
||||
return int((v or {}).get("outputTokens", 0))
|
||||
except (TypeError, ValueError, AttributeError):
|
||||
return 0
|
||||
best = max(mu.items(), key=lambda kv: _out(kv[1]))
|
||||
if best and best[0]:
|
||||
return str(best[0])
|
||||
model = candidate.get("model")
|
||||
if isinstance(model, str) and model:
|
||||
return model
|
||||
except Exception:
|
||||
pass
|
||||
return None
|
||||
|
||||
|
||||
def short_model_name(full: str | None) -> str:
|
||||
"""Trim a full model id to a short tag for the tracker.
|
||||
|
||||
'tokenator/claude-opus-4-8' -> 'opus-4-8'
|
||||
'vibecode/claude-sonnet-4.6' -> 'sonnet-4.6'
|
||||
'claude-opus-4-8' -> 'opus-4-8'
|
||||
Returns '' when full is falsy so callers can omit the ' · <model>' suffix.
|
||||
"""
|
||||
if not full:
|
||||
return ""
|
||||
name = str(full).strip()
|
||||
# Drop any provider prefix up to and including the last '/'.
|
||||
if "/" in name:
|
||||
name = name.rsplit("/", 1)[-1]
|
||||
# Drop a leading 'claude-' marketing prefix.
|
||||
if name.startswith("claude-"):
|
||||
name = name[len("claude-"):]
|
||||
return name
|
||||
|
||||
|
||||
def _extract_last_json_object(text: str) -> dict | None:
|
||||
"""Return the last balanced top-level JSON object in `text` that parses.
|
||||
|
||||
@@ -150,12 +208,15 @@ def record_usage(run_id: int, usage: dict | None):
|
||||
try:
|
||||
conn.execute(
|
||||
"UPDATE agent_runs SET input_tokens=?, output_tokens=?, "
|
||||
"cache_read_tokens=?, cost_usd=? WHERE id=?",
|
||||
"cache_read_tokens=?, cache_creation_tokens=?, cost_usd=?, "
|
||||
"model=COALESCE(?, model) WHERE id=?",
|
||||
(
|
||||
usage.get("input_tokens"),
|
||||
usage.get("output_tokens"),
|
||||
usage.get("cache_read_tokens"),
|
||||
usage.get("cache_creation_tokens"),
|
||||
usage.get("cost_usd"),
|
||||
usage.get("model"),
|
||||
run_id,
|
||||
),
|
||||
)
|
||||
@@ -197,19 +258,132 @@ AGENT_DISPLAY = {
|
||||
}
|
||||
|
||||
|
||||
def usage_comment(agent: str, usage: dict | None) -> str:
|
||||
def _input_total(usage: dict) -> int:
|
||||
"""FULL input = fresh input + cache-read + cache-creation tokens."""
|
||||
def _i(k):
|
||||
try:
|
||||
return int(usage.get(k) or 0)
|
||||
except (TypeError, ValueError):
|
||||
return 0
|
||||
return _i("input_tokens") + _i("cache_read_tokens") + _i("cache_creation_tokens")
|
||||
|
||||
|
||||
def _cached_total(usage: dict) -> int:
|
||||
"""Cached portion of the input = cache-read + cache-creation tokens."""
|
||||
def _i(k):
|
||||
try:
|
||||
return int(usage.get(k) or 0)
|
||||
except (TypeError, ValueError):
|
||||
return 0
|
||||
return _i("cache_read_tokens") + _i("cache_creation_tokens")
|
||||
|
||||
|
||||
def fmt_in(usage: dict) -> str:
|
||||
"""Render the input figure as full total with a cached breakdown.
|
||||
|
||||
'8.5M in (8.4M cached)' when there is a cache; '45.2k in' when cached==0.
|
||||
"""
|
||||
total = _input_total(usage)
|
||||
cached = _cached_total(usage)
|
||||
if cached > 0:
|
||||
return f"{fmt_tokens(total)} in ({fmt_tokens(cached)} cached)"
|
||||
return f"{fmt_tokens(total)} in"
|
||||
|
||||
|
||||
def usage_comment(
|
||||
agent: str,
|
||||
usage: dict | None,
|
||||
repo: str | None = None,
|
||||
branch: str | None = None,
|
||||
work_item_id: str | None = None,
|
||||
pr_number=None,
|
||||
) -> str:
|
||||
"""Build the per-agent finish comment, e.g.
|
||||
'\U0001f4bb Developer \u0433\u043e\u0442\u043e\u0432 \u00b7 45.2k in / 12.1k out \u00b7 $0.21'.
|
||||
'\U0001f4bb Developer \u0433\u043e\u0442\u043e\u0432 \u00b7 8.5M in (8.4M cached) / 45.8k out \u00b7 $7.29'.
|
||||
|
||||
When repo/branch/work_item_id are supplied, the agent's artifact link(s) are
|
||||
appended (BUG: only analyst used to link its docs). Missing artifacts are
|
||||
silently skipped — link building never raises.
|
||||
"""
|
||||
usage = usage or {}
|
||||
name = AGENT_DISPLAY.get(agent, agent.capitalize())
|
||||
icon = AGENT_ICON.get(agent, "\u2705")
|
||||
return (
|
||||
line = (
|
||||
f"{icon} {name} \u0433\u043e\u0442\u043e\u0432 \u00b7 "
|
||||
f"{fmt_tokens(usage.get('input_tokens'))} in / "
|
||||
f"{fmt_in(usage)} / "
|
||||
f"{fmt_tokens(usage.get('output_tokens'))} out \u00b7 "
|
||||
f"{fmt_cost(usage.get('cost_usd'))}"
|
||||
)
|
||||
links = artifact_links(agent, repo, branch, work_item_id, pr_number)
|
||||
if links:
|
||||
line += "\n" + "\n".join(links)
|
||||
return line
|
||||
|
||||
|
||||
# Per-agent artifact file under docs/work-items/{wid}/ (architect/developer use
|
||||
# special handling for ADR dirs / PR links, see artifact_links()).
|
||||
AGENT_ARTIFACT = {
|
||||
"reviewer": ("Review", "12-review.md"),
|
||||
"tester": ("Test report", "13-test-report.md"),
|
||||
"deployer": ("Deploy log", "14-deploy-log.md"),
|
||||
}
|
||||
|
||||
|
||||
def artifact_links(
|
||||
agent: str,
|
||||
repo: str | None,
|
||||
branch: str | None,
|
||||
work_item_id: str | None,
|
||||
pr_number=None,
|
||||
) -> list[str]:
|
||||
"""Markdown link(s) to the finishing agent's artifact(s) in Gitea.
|
||||
|
||||
Uses gitea_public_url (falls back to gitea_url) for clickable links, mirroring
|
||||
the analyst doc links. Returns [] (never raises) when there is nothing to
|
||||
link or the required context is missing. analyst is intentionally NOT handled
|
||||
here — its richer doc list lives in stage_engine._build_analyst_ready_comment.
|
||||
"""
|
||||
try:
|
||||
from .config import settings
|
||||
owner = getattr(settings, "gitea_owner", "admin")
|
||||
base = (
|
||||
getattr(settings, "gitea_public_url", "") or getattr(settings, "gitea_url", "")
|
||||
).rstrip("/")
|
||||
if not base or not repo:
|
||||
return []
|
||||
links: list[str] = []
|
||||
|
||||
if agent == "developer":
|
||||
if branch:
|
||||
links.append(
|
||||
f"\U0001f4c2 [Branch {branch}]({base}/{owner}/{repo}/src/branch/{branch})"
|
||||
)
|
||||
if pr_number:
|
||||
links.append(
|
||||
f"\U0001f517 [PR #{pr_number}]({base}/{owner}/{repo}/pulls/{pr_number})"
|
||||
)
|
||||
return links
|
||||
|
||||
if agent == "architect":
|
||||
if branch and work_item_id:
|
||||
adr_dir = (
|
||||
f"{base}/{owner}/{repo}/src/branch/{branch}/"
|
||||
f"docs/work-items/{work_item_id}/06-adr"
|
||||
)
|
||||
links.append(f"\U0001f4d0 [ADR]({adr_dir})")
|
||||
return links
|
||||
|
||||
spec = AGENT_ARTIFACT.get(agent)
|
||||
if spec and branch and work_item_id:
|
||||
label, fname = spec
|
||||
href = (
|
||||
f"{base}/{owner}/{repo}/src/branch/{branch}/"
|
||||
f"docs/work-items/{work_item_id}/{fname}"
|
||||
)
|
||||
links.append(f"\U0001f4c4 [{label}]({href})")
|
||||
return links
|
||||
except Exception:
|
||||
return []
|
||||
|
||||
|
||||
AGENT_ICON = {
|
||||
@@ -225,13 +399,22 @@ AGENT_ICON = {
|
||||
def task_usage_summary(task_id: int) -> dict:
|
||||
"""Aggregate agent_runs usage for a task.
|
||||
|
||||
Returns {total_in, total_out, total_cost, per_agent: [(agent, in, out, cost), ...]}.
|
||||
total_in counts the FULL input (input + cache_read + cache_creation), and
|
||||
total_cached counts the cached portion (cache_read + cache_creation).
|
||||
COALESCE(...,0) keeps pre-existing rows (NULL cache_creation) from breaking.
|
||||
|
||||
Returns {total_in, total_cached, total_out, total_cost,
|
||||
per_agent: [(agent, in, cached, out, cost), ...]}.
|
||||
"""
|
||||
conn = get_db()
|
||||
try:
|
||||
rows = conn.execute(
|
||||
"SELECT agent, "
|
||||
"COALESCE(SUM(input_tokens),0), "
|
||||
"COALESCE(SUM(input_tokens),0) "
|
||||
" + COALESCE(SUM(cache_read_tokens),0) "
|
||||
" + COALESCE(SUM(cache_creation_tokens),0), "
|
||||
"COALESCE(SUM(cache_read_tokens),0) "
|
||||
" + COALESCE(SUM(cache_creation_tokens),0), "
|
||||
"COALESCE(SUM(output_tokens),0), "
|
||||
"COALESCE(SUM(cost_usd),0.0) "
|
||||
"FROM agent_runs WHERE task_id=? GROUP BY agent ORDER BY agent",
|
||||
@@ -239,12 +422,14 @@ def task_usage_summary(task_id: int) -> dict:
|
||||
).fetchall()
|
||||
finally:
|
||||
conn.close()
|
||||
per_agent = [(r[0], int(r[1]), int(r[2]), float(r[3])) for r in rows]
|
||||
per_agent = [(r[0], int(r[1]), int(r[2]), int(r[3]), float(r[4])) for r in rows]
|
||||
total_in = sum(r[1] for r in per_agent)
|
||||
total_out = sum(r[2] for r in per_agent)
|
||||
total_cost = sum(r[3] for r in per_agent)
|
||||
total_cached = sum(r[2] for r in per_agent)
|
||||
total_out = sum(r[3] for r in per_agent)
|
||||
total_cost = sum(r[4] for r in per_agent)
|
||||
return {
|
||||
"total_in": total_in,
|
||||
"total_cached": total_cached,
|
||||
"total_out": total_out,
|
||||
"total_cost": total_cost,
|
||||
"per_agent": per_agent,
|
||||
@@ -254,15 +439,26 @@ def task_usage_summary(task_id: int) -> dict:
|
||||
def task_summary_comment(task_id: int) -> str:
|
||||
"""Build the Deployer end-of-task summary comment (Feature 4, variant B)."""
|
||||
s = task_usage_summary(task_id)
|
||||
cached = s.get("total_cached", 0)
|
||||
head_in = (
|
||||
f"{fmt_tokens(s['total_in'])} \u0432\u0445\u043e\u0434 ({fmt_tokens(cached)} cached)"
|
||||
if cached > 0
|
||||
else f"{fmt_tokens(s['total_in'])} \u0432\u0445\u043e\u0434"
|
||||
)
|
||||
lines = [
|
||||
f"\U0001f4ca \u0418\u0442\u043e\u0433\u043e \u043f\u043e \u0437\u0430\u0434\u0430\u0447\u0435: "
|
||||
f"{fmt_tokens(s['total_in'])} \u0442\u043e\u043a\u0435\u043d\u043e\u0432 \u0432\u0445\u043e\u0434 / "
|
||||
f"{head_in} / "
|
||||
f"{fmt_tokens(s['total_out'])} \u0432\u044b\u0445\u043e\u0434 \u00b7 "
|
||||
f"{fmt_cost(s['total_cost'])}"
|
||||
]
|
||||
for agent, ti, to, cost in s["per_agent"]:
|
||||
for agent, ti, tc, to, cost in s["per_agent"]:
|
||||
name = AGENT_DISPLAY.get(agent, agent.capitalize())
|
||||
in_str = (
|
||||
f"{fmt_tokens(ti)} in ({fmt_tokens(tc)} cached)"
|
||||
if tc > 0
|
||||
else f"{fmt_tokens(ti)} in"
|
||||
)
|
||||
lines.append(
|
||||
f"\u2022 {name}: {fmt_tokens(ti)} in / {fmt_tokens(to)} out \u00b7 {fmt_cost(cost)}"
|
||||
f"\u2022 {name}: {in_str} / {fmt_tokens(to)} out \u00b7 {fmt_cost(cost)}"
|
||||
)
|
||||
return "\n".join(lines)
|
||||
|
||||
@@ -334,6 +334,20 @@ async def handle_pr(payload: dict):
|
||||
logger.error(f"Task {task_id}: max retries reached, needs manual intervention")
|
||||
|
||||
elif action == "closed" and pr.get("merged", False):
|
||||
# BUG 8 (second door): at the deploy stage `done` is gated by the
|
||||
# deployer's verdict (check_deploy_status via advance_stage), NOT by the
|
||||
# fact that the PR was merged. The deployer merges the PR at the START of
|
||||
# its run, so a merged webhook arrives ~30s later while the deployer is
|
||||
# still working — blindly setting done here would fake-complete the task
|
||||
# and discard a later deploy_status: FAILED verdict. advance_stage will
|
||||
# drive deploy→done (and Plane→Done) when the deployer job finishes.
|
||||
# For every OTHER stage the merge-driven done behaviour is preserved.
|
||||
if current_stage == "deploy":
|
||||
logger.info(
|
||||
f"Task {task_id}: PR merged at deploy stage — done gated by "
|
||||
f"deployer verdict (check_deploy_status), ignoring merge-driven done."
|
||||
)
|
||||
return
|
||||
update_task_stage(task_id, "done")
|
||||
notify_stage_change(task_id, current_stage, "done")
|
||||
logger.info(f"Task {task_id}: PR merged, stage → done")
|
||||
|
||||
@@ -494,8 +494,9 @@ async def start_pipeline(data: dict, project_id: str = ""):
|
||||
# Insert task into DB
|
||||
conn = get_db()
|
||||
conn.execute(
|
||||
"INSERT INTO tasks (plane_id, work_item_id, repo, branch, stage, plane_issue_id) VALUES (?, ?, ?, ?, ?, ?)",
|
||||
(plane_id, work_item_id, repo, branch, "analysis", plane_id),
|
||||
"INSERT INTO tasks (plane_id, work_item_id, repo, branch, stage, plane_issue_id, title) "
|
||||
"VALUES (?, ?, ?, ?, ?, ?, ?)",
|
||||
(plane_id, work_item_id, repo, branch, "analysis", plane_id, name),
|
||||
)
|
||||
conn.commit()
|
||||
conn.close()
|
||||
|
||||
@@ -18,6 +18,7 @@ from src.qg.checks import (
|
||||
check_review_approved,
|
||||
check_tests_passed,
|
||||
check_tests_local,
|
||||
check_deploy_status,
|
||||
)
|
||||
from src.stages import get_qg_for_stage
|
||||
|
||||
@@ -190,6 +191,65 @@ class TestCheckTestsPassed:
|
||||
assert "not found" in reason.lower()
|
||||
|
||||
|
||||
class TestCheckDeployStatus:
|
||||
"""BUG 8: deploy -> done must be gated on the deployer's machine-readable
|
||||
deploy_status verdict in 14-deploy-log.md frontmatter, NOT the LLM exit code
|
||||
(always 0). Mirrors check_reviewer_verdict (reads ONLY the frontmatter field)."""
|
||||
|
||||
def _write_log(self, repo_dir, content):
|
||||
wi_dir = repo_dir / "docs" / "work-items" / "ET-011"
|
||||
wi_dir.mkdir(parents=True)
|
||||
(wi_dir / "14-deploy-log.md").write_text(content)
|
||||
|
||||
def test_success_verdict_passes(self, setup_work_item_dir):
|
||||
self._write_log(
|
||||
setup_work_item_dir,
|
||||
"---\ndeploy_status: SUCCESS\nversion: v0.0.3\n---\n\nDeployed OK.\n",
|
||||
)
|
||||
passed, reason = check_deploy_status("enduro-trails", "ET-011")
|
||||
assert passed is True
|
||||
assert "SUCCESS" in reason
|
||||
|
||||
def test_failed_verdict_fails(self, setup_work_item_dir):
|
||||
self._write_log(
|
||||
setup_work_item_dir,
|
||||
"---\ndeploy_status: FAILED\nversion: v0.0.3\n---\n\npermission denied.\n",
|
||||
)
|
||||
passed, reason = check_deploy_status("enduro-trails", "ET-011")
|
||||
assert passed is False
|
||||
assert "FAILED" in reason
|
||||
|
||||
def test_no_file_fails(self, setup_work_item_dir):
|
||||
passed, reason = check_deploy_status("enduro-trails", "ET-011")
|
||||
assert passed is False
|
||||
assert "not found" in reason.lower()
|
||||
|
||||
def test_no_field_fails(self, setup_work_item_dir):
|
||||
# Frontmatter present but no deploy_status field -> must NOT pass.
|
||||
self._write_log(
|
||||
setup_work_item_dir,
|
||||
"---\nversion: v0.0.3\n---\n\nStatus: FAILED (prose only).\n",
|
||||
)
|
||||
passed, reason = check_deploy_status("enduro-trails", "ET-011")
|
||||
assert passed is False
|
||||
|
||||
def test_prose_only_no_frontmatter_fails(self, setup_work_item_dir):
|
||||
# Prose mentioning SUCCESS but no machine-readable frontmatter -> fail.
|
||||
self._write_log(
|
||||
setup_work_item_dir,
|
||||
"# Deploy log\n\nStatus: SUCCESS (prose, not frontmatter).\n",
|
||||
)
|
||||
passed, reason = check_deploy_status("enduro-trails", "ET-011")
|
||||
assert passed is False
|
||||
|
||||
def test_deploy_stage_qg_is_check_deploy_status(self):
|
||||
assert get_qg_for_stage("deploy") == "check_deploy_status"
|
||||
|
||||
def test_registered_in_qg_checks(self):
|
||||
from src.qg.checks import QG_CHECKS
|
||||
assert QG_CHECKS.get("check_deploy_status") is check_deploy_status
|
||||
|
||||
|
||||
class TestDevelopmentStageQG:
|
||||
"""BUG 6: development stage QG is now check_ci_green (CI is the authoritative
|
||||
gate), not the deprecated check_tests_local."""
|
||||
|
||||
@@ -69,6 +69,7 @@ def silence_side_effects(monkeypatch):
|
||||
"set_issue_needs_input",
|
||||
"set_issue_in_progress",
|
||||
"set_issue_blocked",
|
||||
"set_issue_done",
|
||||
):
|
||||
monkeypatch.setattr(stage_engine, name, MagicMock())
|
||||
|
||||
@@ -177,6 +178,40 @@ class TestHappyPathAgentSelection:
|
||||
assert res.enqueued_agent is None
|
||||
assert _jobs() == []
|
||||
|
||||
def test_deploy_success_syncs_plane_to_terminal_done(self, monkeypatch):
|
||||
"""FIX 3: a successful deploy->done forces the Plane issue to terminal Done.
|
||||
|
||||
Previously the task could stick on In Progress because the merge webhook
|
||||
completed it out-of-band. Now the engine drives set_issue_done() on the
|
||||
deploy->done success transition.
|
||||
"""
|
||||
monkeypatch.setattr(
|
||||
stage_engine, "QG_CHECKS",
|
||||
{k: _pass for k in stage_engine.QG_CHECKS},
|
||||
)
|
||||
task_id = _make_task("deploy", wi="ET-012")
|
||||
res = advance_stage(
|
||||
task_id, "deploy", "enduro-trails", "ET-012",
|
||||
"feature/ET-012-x", finished_agent="deployer",
|
||||
)
|
||||
assert res.advanced is True
|
||||
assert _stage(task_id) == "done"
|
||||
# The terminal Plane sync was invoked with the work item id.
|
||||
stage_engine.set_issue_done.assert_called_once_with("ET-012")
|
||||
|
||||
def test_non_terminal_advance_does_not_force_plane_done(self, monkeypatch):
|
||||
"""set_issue_done must only fire on the terminal deploy->done transition."""
|
||||
monkeypatch.setattr(
|
||||
stage_engine, "QG_CHECKS",
|
||||
{k: _pass for k in stage_engine.QG_CHECKS},
|
||||
)
|
||||
task_id = _make_task("review")
|
||||
advance_stage(
|
||||
task_id, "review", "enduro-trails", "ET-001",
|
||||
"feature/ET-001-x", finished_agent=None,
|
||||
)
|
||||
stage_engine.set_issue_done.assert_not_called()
|
||||
|
||||
def test_done_is_terminal(self):
|
||||
task_id = _make_task("done")
|
||||
res = advance_stage(task_id, "done", "enduro-trails", "ET-001",
|
||||
@@ -300,6 +335,59 @@ class TestTesterFail:
|
||||
assert _jobs() == []
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# BUG 8: deploy verdict gates deploy -> done (not the LLM exit code)
|
||||
# ---------------------------------------------------------------------------
|
||||
class TestDeployVerdict:
|
||||
"""deploy -> done must be gated on check_deploy_status (the deployer's
|
||||
machine-readable verdict), NOT on the LLM exit code (always 0)."""
|
||||
|
||||
def test_failed_verdict_rolls_back_to_development(self, monkeypatch):
|
||||
# deployer finished (exit_code 0 from launcher), but verdict is FAILED.
|
||||
monkeypatch.setattr(
|
||||
stage_engine, "QG_CHECKS",
|
||||
{**stage_engine.QG_CHECKS,
|
||||
"check_deploy_status": _fail("Deploy status: FAILED")},
|
||||
)
|
||||
task_id = _make_task("deploy")
|
||||
res = advance_stage(task_id, "deploy", "enduro-trails", "ET-011",
|
||||
"feature/ET-011-x", finished_agent="deployer")
|
||||
assert res.advanced is False
|
||||
assert res.rolled_back_to == "development"
|
||||
assert _stage(task_id) == "development" # NOT done
|
||||
assert res.alerted is True
|
||||
assert stage_engine.set_issue_blocked.called
|
||||
assert stage_engine.send_telegram.called
|
||||
|
||||
def test_no_deploy_log_rolls_back(self, monkeypatch):
|
||||
# No frontmatter field / no file -> check returns False -> rollback.
|
||||
monkeypatch.setattr(
|
||||
stage_engine, "QG_CHECKS",
|
||||
{**stage_engine.QG_CHECKS,
|
||||
"check_deploy_status": _fail("Deploy log not found (14-deploy-log.md)")},
|
||||
)
|
||||
task_id = _make_task("deploy")
|
||||
res = advance_stage(task_id, "deploy", "enduro-trails", "ET-011",
|
||||
"feature/ET-011-x", finished_agent="deployer")
|
||||
assert res.advanced is False
|
||||
assert _stage(task_id) == "development"
|
||||
|
||||
def test_success_verdict_advances_to_done(self, monkeypatch):
|
||||
monkeypatch.setattr(
|
||||
stage_engine, "QG_CHECKS",
|
||||
{**stage_engine.QG_CHECKS,
|
||||
"check_deploy_status": _pass},
|
||||
)
|
||||
task_id = _make_task("deploy")
|
||||
res = advance_stage(task_id, "deploy", "enduro-trails", "ET-011",
|
||||
"feature/ET-011-x", finished_agent="deployer")
|
||||
assert res.advanced is True
|
||||
assert res.to_stage == "done"
|
||||
assert _stage(task_id) == "done"
|
||||
assert res.enqueued_agent is None # no agent leaves deploy
|
||||
assert _jobs() == []
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Architect conflict -> rollback to analysis + enqueue analyst
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
518
tests/test_telegram_tracker.py
Normal file
518
tests/test_telegram_tracker.py
Normal file
@@ -0,0 +1,518 @@
|
||||
"""feat/telegram-live-tracker: tests for the live Telegram task tracker.
|
||||
|
||||
Covers (per DEV_TASK_TELEGRAM_TRACKER.md):
|
||||
* short_model_name: provider/claude- prefix trimming.
|
||||
* render_task_tracker: per-stage line format (in↓/out↑, model, cost, minutes),
|
||||
the "⏸️ Ревью БРД · твоё время" line, the 💰 totals, and the finish block
|
||||
(⏱️ three times + 🔗/📦).
|
||||
* first message -> sendMessage stores message_id; transition -> editMessageText.
|
||||
* fallback: editMessageText fails -> a NEW message is sent and the id updated.
|
||||
* which alerts go out SEPARATELY (approve-gate / deploy-fail / agent-fail /
|
||||
error) vs which do NOT (QG-pending / agent-start / stage-transition).
|
||||
|
||||
Isolated temp DB; no network (httpx is patched).
|
||||
"""
|
||||
|
||||
import os
|
||||
import tempfile
|
||||
|
||||
os.environ.setdefault("ORCH_PLANE_API_TOKEN", "test-token")
|
||||
os.environ.setdefault("ORCH_GITEA_TOKEN", "test-token")
|
||||
|
||||
_test_db = os.path.join(tempfile.gettempdir(), "test_orchestrator_tracker.db")
|
||||
os.environ["ORCH_DB_PATH"] = _test_db
|
||||
|
||||
from unittest.mock import MagicMock, patch # noqa: E402
|
||||
|
||||
import pytest # noqa: E402
|
||||
|
||||
import src.db as db_module # noqa: E402
|
||||
from src.db import init_db, get_db # noqa: E402
|
||||
from src import notifications as N # noqa: E402
|
||||
from src import usage as U # noqa: E402
|
||||
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def setup_db(monkeypatch):
|
||||
monkeypatch.setattr(db_module.settings, "db_path", _test_db, raising=False)
|
||||
if os.path.exists(_test_db):
|
||||
os.unlink(_test_db)
|
||||
init_db()
|
||||
# Re-enable send_telegram (conftest stubs it to a no-op); these tests patch
|
||||
# httpx / the lower-level helpers explicitly per case.
|
||||
yield
|
||||
if os.path.exists(_test_db):
|
||||
os.unlink(_test_db)
|
||||
|
||||
|
||||
# --------------------------------------------------------------------------- #
|
||||
# helpers to build a task + runs in the DB
|
||||
# --------------------------------------------------------------------------- #
|
||||
def _mk_task(stage="development", title="\u0422\u0440\u0435\u043a\u0438 \u0441 \u0437\u0443\u043c\u0430 z5",
|
||||
wid="ET-012", brd_start=None, brd_end=None):
|
||||
conn = get_db()
|
||||
cur = conn.execute(
|
||||
"INSERT INTO tasks (plane_id, work_item_id, repo, branch, stage, title, "
|
||||
"brd_review_started_at, brd_review_ended_at) "
|
||||
"VALUES (?, ?, ?, ?, ?, ?, ?, ?)",
|
||||
("p1", wid, "enduro-trails", "feature/ET-012-x", stage, title,
|
||||
brd_start, brd_end),
|
||||
)
|
||||
tid = cur.lastrowid
|
||||
conn.commit()
|
||||
conn.close()
|
||||
return tid
|
||||
|
||||
|
||||
def _mk_run(task_id, agent, started, finished, in_tok, out_tok,
|
||||
cache_read=0, cache_creation=0, cost=0.0, model=None, exit_code=0):
|
||||
conn = get_db()
|
||||
cur = conn.execute(
|
||||
"INSERT INTO agent_runs (task_id, agent, started_at, finished_at, "
|
||||
"exit_code, input_tokens, output_tokens, cache_read_tokens, "
|
||||
"cache_creation_tokens, cost_usd, model) "
|
||||
"VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
|
||||
(task_id, agent, started, finished, exit_code, in_tok, out_tok,
|
||||
cache_read, cache_creation, cost, model),
|
||||
)
|
||||
rid = cur.lastrowid
|
||||
conn.commit()
|
||||
conn.close()
|
||||
return rid
|
||||
|
||||
|
||||
# --------------------------------------------------------------------------- #
|
||||
# short_model_name
|
||||
# --------------------------------------------------------------------------- #
|
||||
def test_short_model_name():
|
||||
assert U.short_model_name("tokenator/claude-opus-4-8") == "opus-4-8"
|
||||
assert U.short_model_name("vibecode/claude-sonnet-4.6") == "sonnet-4.6"
|
||||
assert U.short_model_name("claude-opus-4-8") == "opus-4-8"
|
||||
assert U.short_model_name("opus-4-8") == "opus-4-8"
|
||||
assert U.short_model_name(None) == ""
|
||||
assert U.short_model_name("") == ""
|
||||
|
||||
|
||||
def test_parse_usage_extracts_model_from_modelusage():
|
||||
blob = (
|
||||
'{"total_cost_usd":0.01,'
|
||||
'"usage":{"input_tokens":10,"output_tokens":5},'
|
||||
'"modelUsage":{"claude-opus-4-8":{"inputTokens":10,"outputTokens":5}}}'
|
||||
)
|
||||
u = U.parse_usage_from_text(blob)
|
||||
assert u["model"] == "claude-opus-4-8"
|
||||
|
||||
|
||||
# --------------------------------------------------------------------------- #
|
||||
# render_task_tracker
|
||||
# --------------------------------------------------------------------------- #
|
||||
def test_render_in_progress_stage_lines_and_totals():
|
||||
tid = _mk_task(stage="deploy", brd_start="2026-06-04 10:00:00",
|
||||
brd_end="2026-06-04 10:08:00")
|
||||
# Analysis: 10м, 1.1M in (mostly cache) / 39.6k out, $2.38, opus-4-8
|
||||
_mk_run(tid, "analyst", "2026-06-04 09:00:00", "2026-06-04 09:10:00",
|
||||
in_tok=1000, out_tok=39600, cache_read=1_100_000, cost=2.38,
|
||||
model="tokenator/claude-opus-4-8")
|
||||
_mk_run(tid, "architect", "2026-06-04 10:08:00", "2026-06-04 10:17:00",
|
||||
in_tok=500, out_tok=34400, cache_read=1_500_000, cost=2.24,
|
||||
model="tokenator/claude-opus-4-8")
|
||||
_mk_run(tid, "developer", "2026-06-04 10:17:00", "2026-06-04 10:28:00",
|
||||
in_tok=400, out_tok=45800, cache_read=8_400_000, cost=7.29,
|
||||
model="tokenator/claude-opus-4-8")
|
||||
_mk_run(tid, "reviewer", "2026-06-04 10:28:00", "2026-06-04 10:31:00",
|
||||
in_tok=300, out_tok=12900, cache_read=1_200_000, cost=1.53,
|
||||
model="vibecode/claude-sonnet-4.6")
|
||||
_mk_run(tid, "tester", "2026-06-04 10:31:00", "2026-06-04 10:36:00",
|
||||
in_tok=200, out_tok=19500, cache_read=1_200_000, cost=1.51,
|
||||
model="vibecode/claude-sonnet-4.6")
|
||||
# deployer started but not finished -> active "идёт" line.
|
||||
_mk_run(tid, "deployer", "2026-06-04 10:36:00", None,
|
||||
in_tok=0, out_tok=0, model=None, exit_code=None)
|
||||
|
||||
text = N.render_task_tracker(tid)
|
||||
|
||||
# Header in-progress
|
||||
assert text.startswith("\U0001f6e0\ufe0f ET-012 \u00b7 \u0422\u0440\u0435\u043a\u0438")
|
||||
# Per-stage format: in↓/out↑ · cost · model
|
||||
assert "\u2705 Analysis" in text
|
||||
assert "10\u043c" in text # analysis duration
|
||||
assert "39.6k\u2191" in text # analysis out
|
||||
assert "$2.38" in text
|
||||
assert "opus-4-8" in text
|
||||
assert "sonnet-4.6" in text # reviewer/tester model
|
||||
# BRD review line (human time, ended)
|
||||
assert "\u0420\u0435\u0432\u044c\u044e \u0411\u0420\u0414" in text
|
||||
assert "\u0442\u0432\u043e\u0451 \u0432\u0440\u0435\u043c\u044f" in text
|
||||
# Active stage
|
||||
assert "\U0001f504 Deploy" in text
|
||||
assert "\u0438\u0434\u0451\u0442" in text
|
||||
# Totals line present with 💰
|
||||
assert "\U0001f4b0" in text
|
||||
# In-progress: no final ⏱️ line
|
||||
assert "\u0412\u0441\u0435\u0433\u043e" not in text
|
||||
|
||||
|
||||
def test_render_brd_review_waiting_shows_hourglass():
|
||||
tid = _mk_task(stage="analysis", brd_start="2026-06-04 10:00:00",
|
||||
brd_end=None)
|
||||
_mk_run(tid, "analyst", "2026-06-04 09:00:00", "2026-06-04 09:10:00",
|
||||
in_tok=1000, out_tok=39600, cache_read=1_100_000, cost=2.38,
|
||||
model="tokenator/claude-opus-4-8")
|
||||
text = N.render_task_tracker(tid)
|
||||
assert "\u0420\u0435\u0432\u044c\u044e \u0411\u0420\u0414" in text
|
||||
assert "\u23f3" in text # hourglass while waiting
|
||||
|
||||
|
||||
def test_render_done_has_times_and_links():
|
||||
tid = _mk_task(stage="done", brd_start="2026-06-04 10:00:00",
|
||||
brd_end="2026-06-04 10:08:00")
|
||||
# set created/updated to compute wall clock
|
||||
conn = get_db()
|
||||
conn.execute(
|
||||
"UPDATE tasks SET created_at='2026-06-04 09:00:00', "
|
||||
"updated_at='2026-06-04 09:56:00' WHERE id=?", (tid,))
|
||||
conn.commit()
|
||||
conn.close()
|
||||
_mk_run(tid, "analyst", "2026-06-04 09:00:00", "2026-06-04 09:10:00",
|
||||
in_tok=1000, out_tok=39600, cache_read=1_100_000, cost=2.38,
|
||||
model="tokenator/claude-opus-4-8")
|
||||
_mk_run(tid, "deployer", "2026-06-04 09:50:00", "2026-06-04 09:56:00",
|
||||
in_tok=400, out_tok=22400, cache_read=1_600_000, cost=1.73,
|
||||
model="tokenator/claude-opus-4-8")
|
||||
|
||||
with patch("src.notifications.httpx") as _hx:
|
||||
# No PR found -> just "📦 deployed"
|
||||
_resp = MagicMock(status_code=200)
|
||||
_resp.json.return_value = []
|
||||
_hx.get.return_value = _resp
|
||||
text = N.render_task_tracker(tid)
|
||||
|
||||
assert text.startswith("\U0001f389 ET-012")
|
||||
assert "\u0413\u041e\u0422\u041e\u0412\u041e" in text
|
||||
# ⏱️ with three times
|
||||
assert "\u23f1\ufe0f" in text
|
||||
assert "\u0412\u0441\u0435\u0433\u043e" in text
|
||||
assert "\u0430\u0433\u0435\u043d\u0442\u044b" in text
|
||||
assert "\u0442\u0432\u043e\u0451" in text
|
||||
# 📦 deployed line
|
||||
assert "\U0001f4e6" in text
|
||||
|
||||
|
||||
def test_render_escapes_html_in_title():
|
||||
tid = _mk_task(stage="analysis", title="A <b>& B</b>")
|
||||
_mk_run(tid, "analyst", "2026-06-04 09:00:00", "2026-06-04 09:10:00",
|
||||
in_tok=10, out_tok=5, cost=0.0)
|
||||
text = N.render_task_tracker(tid)
|
||||
assert "<b>" in text
|
||||
assert "&" in text
|
||||
|
||||
|
||||
def test_render_omits_model_when_unknown():
|
||||
tid = _mk_task(stage="analysis")
|
||||
_mk_run(tid, "analyst", "2026-06-04 09:00:00", "2026-06-04 09:10:00",
|
||||
in_tok=10, out_tok=5, cost=0.0, model=None)
|
||||
text = N.render_task_tracker(tid)
|
||||
# No trailing " · <model>" — line ends at cost.
|
||||
line = [l for l in text.splitlines() if l.startswith("\u2705 Analysis")][0]
|
||||
assert line.rstrip().endswith("$0.00")
|
||||
|
||||
|
||||
# --------------------------------------------------------------------------- #
|
||||
# tracker send / edit / fallback
|
||||
# --------------------------------------------------------------------------- #
|
||||
def test_first_call_sends_message_and_stores_id(monkeypatch):
|
||||
tid = _mk_task(stage="analysis")
|
||||
_mk_run(tid, "analyst", "2026-06-04 09:00:00", None, in_tok=0, out_tok=0,
|
||||
exit_code=None)
|
||||
|
||||
sent = {}
|
||||
def _fake_send(text, disable_notification=False):
|
||||
sent["text"] = text
|
||||
sent["silent"] = disable_notification
|
||||
return 555
|
||||
monkeypatch.setattr(N, "send_telegram", _fake_send)
|
||||
monkeypatch.setattr(N, "edit_telegram", lambda *a, **k: (_ for _ in ()).throw(AssertionError("should not edit on first call")))
|
||||
|
||||
N.update_task_tracker(tid)
|
||||
|
||||
from src.db import get_tracker_message_id
|
||||
assert get_tracker_message_id(tid) == 555
|
||||
assert sent["silent"] is True # tracker is silent
|
||||
|
||||
|
||||
def test_second_call_edits_existing_message(monkeypatch):
|
||||
tid = _mk_task(stage="development")
|
||||
_mk_run(tid, "analyst", "2026-06-04 09:00:00", "2026-06-04 09:10:00",
|
||||
in_tok=10, out_tok=5, cost=0.1)
|
||||
from src.db import set_tracker_message_id
|
||||
set_tracker_message_id(tid, 777)
|
||||
|
||||
edited = {}
|
||||
monkeypatch.setattr(N, "edit_telegram",
|
||||
lambda mid, text: edited.update(mid=mid) or N.EDIT_OK)
|
||||
monkeypatch.setattr(N, "send_telegram",
|
||||
lambda *a, **k: (_ for _ in ()).throw(AssertionError("should not send when edit succeeds")))
|
||||
|
||||
N.update_task_tracker(tid)
|
||||
assert edited["mid"] == 777
|
||||
|
||||
|
||||
def test_fallback_to_new_message_when_edit_gone(monkeypatch):
|
||||
"""edit returns 'gone' (message deleted/too old) -> send NEW + update id."""
|
||||
tid = _mk_task(stage="development")
|
||||
_mk_run(tid, "analyst", "2026-06-04 09:00:00", "2026-06-04 09:10:00",
|
||||
in_tok=10, out_tok=5, cost=0.1)
|
||||
from src.db import set_tracker_message_id, get_tracker_message_id
|
||||
set_tracker_message_id(tid, 100)
|
||||
|
||||
monkeypatch.setattr(N, "edit_telegram", lambda mid, text: N.EDIT_GONE)
|
||||
monkeypatch.setattr(N, "send_telegram", lambda text, disable_notification=False: 200)
|
||||
|
||||
N.update_task_tracker(tid)
|
||||
assert get_tracker_message_id(tid) == 200 # id updated to the new message
|
||||
|
||||
|
||||
def test_not_modified_does_not_send_new_message(monkeypatch):
|
||||
"""edit returns 'not_modified' -> NO new message, id unchanged (no dupe)."""
|
||||
tid = _mk_task(stage="development")
|
||||
_mk_run(tid, "analyst", "2026-06-04 09:00:00", "2026-06-04 09:10:00",
|
||||
in_tok=10, out_tok=5, cost=0.1)
|
||||
from src.db import set_tracker_message_id, get_tracker_message_id
|
||||
set_tracker_message_id(tid, 100)
|
||||
|
||||
monkeypatch.setattr(N, "edit_telegram", lambda mid, text: N.EDIT_NOT_MODIFIED)
|
||||
monkeypatch.setattr(N, "send_telegram",
|
||||
lambda *a, **k: (_ for _ in ()).throw(AssertionError("must not send on not_modified")))
|
||||
|
||||
N.update_task_tracker(tid)
|
||||
assert get_tracker_message_id(tid) == 100 # unchanged, no duplicate
|
||||
|
||||
|
||||
def test_transient_edit_failure_does_not_send_new_message(monkeypatch):
|
||||
"""edit returns 'failed' (network/timeout/5xx) -> NO new message (no dupe)."""
|
||||
tid = _mk_task(stage="development")
|
||||
_mk_run(tid, "analyst", "2026-06-04 09:00:00", "2026-06-04 09:10:00",
|
||||
in_tok=10, out_tok=5, cost=0.1)
|
||||
from src.db import set_tracker_message_id, get_tracker_message_id
|
||||
set_tracker_message_id(tid, 100)
|
||||
|
||||
monkeypatch.setattr(N, "edit_telegram", lambda mid, text: N.EDIT_FAILED)
|
||||
monkeypatch.setattr(N, "send_telegram",
|
||||
lambda *a, **k: (_ for _ in ()).throw(AssertionError("must not send on transient failure")))
|
||||
|
||||
N.update_task_tracker(tid)
|
||||
assert get_tracker_message_id(tid) == 100 # unchanged, no duplicate
|
||||
|
||||
|
||||
# --------------------------------------------------------------------------- #
|
||||
# edit_telegram outcome classification (httpx mocked)
|
||||
# --------------------------------------------------------------------------- #
|
||||
def _edit_resp(ok, description=None):
|
||||
resp = MagicMock()
|
||||
body = {"ok": ok}
|
||||
if description is not None:
|
||||
body["description"] = description
|
||||
resp.json.return_value = body
|
||||
return resp
|
||||
|
||||
|
||||
def _patch_tg_creds(monkeypatch):
|
||||
monkeypatch.setattr(N._get_settings(), "telegram_bot_token", "T", raising=False)
|
||||
monkeypatch.setattr(N._get_settings(), "telegram_chat_id", "C", raising=False)
|
||||
|
||||
|
||||
def test_edit_telegram_ok(monkeypatch):
|
||||
_patch_tg_creds(monkeypatch)
|
||||
with patch("src.notifications.httpx") as hx:
|
||||
hx.post.return_value = _edit_resp(True)
|
||||
assert N.edit_telegram(1, "x") == N.EDIT_OK
|
||||
|
||||
|
||||
def test_edit_telegram_not_modified_is_success(monkeypatch):
|
||||
# 400 "message is not modified" -> success, not gone, no duplicate
|
||||
_patch_tg_creds(monkeypatch)
|
||||
with patch("src.notifications.httpx") as hx:
|
||||
hx.post.return_value = _edit_resp(
|
||||
False, "Bad Request: message is not modified: ...")
|
||||
assert N.edit_telegram(1, "x") == N.EDIT_NOT_MODIFIED
|
||||
|
||||
|
||||
def test_edit_telegram_exactly_the_same_is_not_modified(monkeypatch):
|
||||
_patch_tg_creds(monkeypatch)
|
||||
with patch("src.notifications.httpx") as hx:
|
||||
hx.post.return_value = _edit_resp(
|
||||
False, "Bad Request: specified new message content and reply markup "
|
||||
"are exactly the same")
|
||||
assert N.edit_telegram(1, "x") == N.EDIT_NOT_MODIFIED
|
||||
|
||||
|
||||
def test_edit_telegram_message_not_found_is_gone(monkeypatch):
|
||||
_patch_tg_creds(monkeypatch)
|
||||
with patch("src.notifications.httpx") as hx:
|
||||
hx.post.return_value = _edit_resp(
|
||||
False, "Bad Request: message to edit not found")
|
||||
assert N.edit_telegram(1, "x") == N.EDIT_GONE
|
||||
|
||||
|
||||
def test_edit_telegram_cant_be_edited_is_gone(monkeypatch):
|
||||
_patch_tg_creds(monkeypatch)
|
||||
with patch("src.notifications.httpx") as hx:
|
||||
hx.post.return_value = _edit_resp(
|
||||
False, "Bad Request: message can't be edited")
|
||||
assert N.edit_telegram(1, "x") == N.EDIT_GONE
|
||||
|
||||
|
||||
def test_edit_telegram_unknown_400_is_failed(monkeypatch):
|
||||
# unknown 400 -> failed (NOT gone) -> caller won't duplicate
|
||||
_patch_tg_creds(monkeypatch)
|
||||
with patch("src.notifications.httpx") as hx:
|
||||
hx.post.return_value = _edit_resp(
|
||||
False, "Bad Request: some other unexpected error")
|
||||
assert N.edit_telegram(1, "x") == N.EDIT_FAILED
|
||||
|
||||
|
||||
def test_edit_telegram_timeout_is_failed(monkeypatch):
|
||||
_patch_tg_creds(monkeypatch)
|
||||
with patch("src.notifications.httpx") as hx:
|
||||
hx.post.side_effect = Exception("read timeout")
|
||||
assert N.edit_telegram(1, "x") == N.EDIT_FAILED
|
||||
|
||||
|
||||
def test_edit_telegram_5xx_is_failed(monkeypatch):
|
||||
# Telegram 5xx still returns ok:false w/o gone/not_modified markers
|
||||
_patch_tg_creds(monkeypatch)
|
||||
with patch("src.notifications.httpx") as hx:
|
||||
hx.post.return_value = _edit_resp(False, "Internal Server Error")
|
||||
assert N.edit_telegram(1, "x") == N.EDIT_FAILED
|
||||
|
||||
|
||||
# --------------------------------------------------------------------------- #
|
||||
# render: repeated stage attempt shows "попытка N"
|
||||
# --------------------------------------------------------------------------- #
|
||||
_POPYTKA = "\u043f\u043e\u043f\u044b\u0442\u043a\u0430" # popytka
|
||||
|
||||
|
||||
def test_render_active_stage_shows_attempt_on_second_run():
|
||||
# Two reviewer runs while in review -> active line shows attempt 2.
|
||||
tid = _mk_task(stage="review")
|
||||
_mk_run(tid, "analyst", "2026-06-04 09:00:00", "2026-06-04 09:10:00",
|
||||
in_tok=10, out_tok=5, cost=0.1, model="tokenator/claude-opus-4-8")
|
||||
_mk_run(tid, "developer", "2026-06-04 09:10:00", "2026-06-04 09:20:00",
|
||||
in_tok=10, out_tok=5, cost=0.1, model="tokenator/claude-opus-4-8")
|
||||
# First review run finished (sent back to dev), second review run active.
|
||||
_mk_run(tid, "reviewer", "2026-06-04 09:20:00", "2026-06-04 09:25:00",
|
||||
in_tok=10, out_tok=5, cost=0.1, model="vibecode/claude-sonnet-4.6",
|
||||
exit_code=0)
|
||||
_mk_run(tid, "reviewer", "2026-06-04 09:30:00", None,
|
||||
in_tok=0, out_tok=0, exit_code=None)
|
||||
|
||||
text = N.render_task_tracker(tid)
|
||||
active = [l for l in text.splitlines()
|
||||
if l.startswith("\U0001f504") and "Review" in l][0]
|
||||
assert _POPYTKA in active
|
||||
assert "2" in active
|
||||
assert "\u0438\u0434\u0451\u0442" in active
|
||||
|
||||
|
||||
def test_render_active_stage_no_attempt_on_first_run():
|
||||
# Single reviewer run -> active line has NO attempt marker.
|
||||
tid = _mk_task(stage="review")
|
||||
_mk_run(tid, "analyst", "2026-06-04 09:00:00", "2026-06-04 09:10:00",
|
||||
in_tok=10, out_tok=5, cost=0.1, model="tokenator/claude-opus-4-8")
|
||||
_mk_run(tid, "developer", "2026-06-04 09:10:00", "2026-06-04 09:20:00",
|
||||
in_tok=10, out_tok=5, cost=0.1, model="tokenator/claude-opus-4-8")
|
||||
_mk_run(tid, "reviewer", "2026-06-04 09:20:00", None,
|
||||
in_tok=0, out_tok=0, exit_code=None)
|
||||
|
||||
text = N.render_task_tracker(tid)
|
||||
active = [l for l in text.splitlines()
|
||||
if l.startswith("\U0001f504") and "Review" in l][0]
|
||||
assert _POPYTKA not in active
|
||||
assert "\u0438\u0434\u0451\u0442" in active
|
||||
|
||||
|
||||
def test_render_finished_lines_unaffected_by_attempt_logic():
|
||||
# Completed (checkmark) lines never carry an attempt marker.
|
||||
tid = _mk_task(stage="review")
|
||||
_mk_run(tid, "analyst", "2026-06-04 09:00:00", "2026-06-04 09:10:00",
|
||||
in_tok=10, out_tok=5, cost=0.1, model="tokenator/claude-opus-4-8")
|
||||
# developer ran twice (retry) but is a FINISHED stage now.
|
||||
_mk_run(tid, "developer", "2026-06-04 09:10:00", "2026-06-04 09:15:00",
|
||||
in_tok=10, out_tok=5, cost=0.1, model="tokenator/claude-opus-4-8")
|
||||
_mk_run(tid, "developer", "2026-06-04 09:16:00", "2026-06-04 09:20:00",
|
||||
in_tok=10, out_tok=5, cost=0.1, model="tokenator/claude-opus-4-8")
|
||||
text = N.render_task_tracker(tid)
|
||||
for l in text.splitlines():
|
||||
if l.startswith("\u2705"):
|
||||
assert _POPYTKA not in l
|
||||
|
||||
|
||||
# --------------------------------------------------------------------------- #
|
||||
# which alerts are SEPARATE vs tracker-only
|
||||
# --------------------------------------------------------------------------- #
|
||||
def test_approve_gate_sends_separate_message_and_starts_brd_clock(monkeypatch):
|
||||
tid = _mk_task(stage="analysis")
|
||||
calls = []
|
||||
monkeypatch.setattr(N, "send_telegram",
|
||||
lambda text, disable_notification=False: calls.append((text, disable_notification)) or 1)
|
||||
monkeypatch.setattr(N, "update_task_tracker", lambda task_id: None)
|
||||
|
||||
N.notify_approve_requested(tid)
|
||||
|
||||
# exactly one SEPARATE (notifying) send for the approve gate
|
||||
assert len(calls) == 1
|
||||
assert calls[0][1] is False # notifying
|
||||
assert "Approved" in calls[0][0]
|
||||
# BRD clock started
|
||||
conn = get_db()
|
||||
row = conn.execute("SELECT brd_review_started_at FROM tasks WHERE id=?", (tid,)).fetchone()
|
||||
conn.close()
|
||||
assert row[0] is not None
|
||||
|
||||
|
||||
def test_error_sends_separate_message(monkeypatch):
|
||||
tid = _mk_task(stage="development")
|
||||
calls = []
|
||||
monkeypatch.setattr(N, "send_telegram",
|
||||
lambda text, disable_notification=False: calls.append((text, disable_notification)) or 1)
|
||||
N.notify_error(tid, "boom")
|
||||
assert len(calls) == 1
|
||||
assert calls[0][1] is False # notifying
|
||||
assert "ERROR" in calls[0][0]
|
||||
|
||||
|
||||
def test_stage_change_does_not_send_separate_message(monkeypatch):
|
||||
tid = _mk_task(stage="development")
|
||||
sent = []
|
||||
monkeypatch.setattr(N, "send_telegram",
|
||||
lambda text, disable_notification=False: sent.append(text) or 1)
|
||||
# tracker refresh is allowed (edit/send silent) but must NOT use send_telegram
|
||||
# for a separate notification; stub update to isolate.
|
||||
refreshed = []
|
||||
monkeypatch.setattr(N, "update_task_tracker", lambda task_id: refreshed.append(task_id))
|
||||
|
||||
N.notify_stage_change(tid, "development", "review")
|
||||
assert sent == [] # no separate message
|
||||
assert refreshed == [tid] # tracker refreshed instead
|
||||
|
||||
|
||||
def test_agent_started_does_not_send_separate_message(monkeypatch):
|
||||
tid = _mk_task(stage="analysis")
|
||||
sent = []
|
||||
monkeypatch.setattr(N, "send_telegram",
|
||||
lambda text, disable_notification=False: sent.append(text) or 1)
|
||||
refreshed = []
|
||||
monkeypatch.setattr(N, "update_task_tracker", lambda task_id: refreshed.append(task_id))
|
||||
|
||||
N.notify_agent_started(1, "analyst", tid)
|
||||
assert sent == []
|
||||
assert refreshed == [tid]
|
||||
|
||||
|
||||
def test_qg_failure_does_not_send_separate_message(monkeypatch):
|
||||
tid = _mk_task(stage="development")
|
||||
sent = []
|
||||
monkeypatch.setattr(N, "send_telegram",
|
||||
lambda text, disable_notification=False: sent.append(text) or 1)
|
||||
N.notify_qg_failure(tid, "development", "check_ci_green", "CI state: pending")
|
||||
assert sent == [] # QG-pending is log-only, never a separate ping
|
||||
@@ -62,9 +62,27 @@ def test_parse_real_result_json():
|
||||
assert u["input_tokens"] == 45231
|
||||
assert u["output_tokens"] == 12100
|
||||
assert u["cache_read_tokens"] == 18500
|
||||
# FIX 2: cache_creation slice must now be parsed (was dropped before).
|
||||
assert u["cache_creation_tokens"] == 7418
|
||||
assert abs(u["cost_usd"] - 0.0560175) < 1e-9
|
||||
|
||||
|
||||
def test_parse_cache_creation_present():
|
||||
u = U.parse_usage_from_text(REAL_RESULT_JSON)
|
||||
assert u["cache_creation_tokens"] == 7418
|
||||
|
||||
|
||||
def test_parse_cache_creation_missing_defaults_zero():
|
||||
blob = (
|
||||
'{"total_cost_usd":0.01,'
|
||||
'"usage":{"input_tokens":10,"output_tokens":5,'
|
||||
'"cache_read_input_tokens":100}}'
|
||||
)
|
||||
u = U.parse_usage_from_text(blob)
|
||||
assert u["cache_creation_tokens"] == 0
|
||||
assert u["cache_read_tokens"] == 100
|
||||
|
||||
|
||||
def test_parse_with_leading_text():
|
||||
"""The agent may print text before the trailing JSON; we still find it."""
|
||||
text = "some agent stdout line\nanother line\n" + REAL_RESULT_JSON
|
||||
@@ -106,13 +124,16 @@ def test_record_usage_writes_columns():
|
||||
U.record_usage(rid, u)
|
||||
conn = get_db()
|
||||
row = conn.execute(
|
||||
"SELECT input_tokens, output_tokens, cache_read_tokens, cost_usd "
|
||||
"SELECT input_tokens, output_tokens, cache_read_tokens, "
|
||||
"cache_creation_tokens, cost_usd "
|
||||
"FROM agent_runs WHERE id=?", (rid,)
|
||||
).fetchone()
|
||||
conn.close()
|
||||
assert row["input_tokens"] == 45231
|
||||
assert row["output_tokens"] == 12100
|
||||
assert row["cache_read_tokens"] == 18500
|
||||
# FIX 2: cache_creation column is now persisted.
|
||||
assert row["cache_creation_tokens"] == 7418
|
||||
assert abs(row["cost_usd"] - 0.0560175) < 1e-9
|
||||
|
||||
|
||||
@@ -144,14 +165,82 @@ def test_fmt_cost():
|
||||
|
||||
|
||||
def test_usage_comment_format():
|
||||
# No cache -> in_total == input_tokens, no cached breakdown shown.
|
||||
u = {"input_tokens": 45231, "output_tokens": 12100, "cost_usd": 0.21}
|
||||
c = U.usage_comment("developer", u)
|
||||
assert "Developer" in c
|
||||
assert "45.2k in" in c
|
||||
assert "cached" not in c
|
||||
assert "12.1k out" in c
|
||||
assert "$0.21" in c
|
||||
|
||||
|
||||
def test_usage_comment_shows_full_input_with_cached():
|
||||
"""FIX 2: in = input + cache_read + cache_creation, with cached breakdown."""
|
||||
u = {
|
||||
"input_tokens": 81,
|
||||
"cache_read_tokens": 8_400_000,
|
||||
"cache_creation_tokens": 100_000,
|
||||
"output_tokens": 45_800,
|
||||
"cost_usd": 7.29,
|
||||
}
|
||||
c = U.usage_comment("developer", u)
|
||||
# total in = 8_500_081 -> 8.5M ; cached = 8_500_000 -> 8.5M
|
||||
assert "8.5M in (8.5M cached)" in c
|
||||
assert "45.8k out" in c
|
||||
assert "$7.29" in c
|
||||
|
||||
|
||||
def test_usage_comment_no_cached_when_zero():
|
||||
u = {"input_tokens": 1234, "cache_read_tokens": 0,
|
||||
"cache_creation_tokens": 0, "output_tokens": 50, "cost_usd": 0.01}
|
||||
c = U.usage_comment("developer", u)
|
||||
assert "1.2k in" in c
|
||||
assert "cached" not in c
|
||||
|
||||
|
||||
# --------------------------------------------------------------------------- #
|
||||
# FIX 4: per-agent artifact links in finish comments
|
||||
# --------------------------------------------------------------------------- #
|
||||
def _ctx():
|
||||
return dict(repo="enduro-trails", branch="feature/ET-012-x",
|
||||
work_item_id="ET-012")
|
||||
|
||||
|
||||
def test_usage_comment_reviewer_links_review_doc():
|
||||
c = U.usage_comment("reviewer", {"input_tokens": 5}, **_ctx())
|
||||
assert "12-review.md" in c
|
||||
assert "ET-012" in c
|
||||
|
||||
|
||||
def test_usage_comment_tester_links_test_report():
|
||||
c = U.usage_comment("tester", {"input_tokens": 5}, **_ctx())
|
||||
assert "13-test-report.md" in c
|
||||
|
||||
|
||||
def test_usage_comment_deployer_links_deploy_log():
|
||||
c = U.usage_comment("deployer", {"input_tokens": 5}, **_ctx())
|
||||
assert "14-deploy-log.md" in c
|
||||
|
||||
|
||||
def test_usage_comment_developer_links_pr_and_branch():
|
||||
c = U.usage_comment("developer", {"input_tokens": 5}, pr_number=7, **_ctx())
|
||||
assert "pulls/7" in c
|
||||
assert "feature/ET-012-x" in c
|
||||
|
||||
|
||||
def test_usage_comment_architect_links_adr():
|
||||
c = U.usage_comment("architect", {"input_tokens": 5}, **_ctx())
|
||||
assert "06-adr" in c
|
||||
|
||||
|
||||
def test_usage_comment_no_links_without_context():
|
||||
"""Without repo/branch context, no links are appended (no crash)."""
|
||||
c = U.usage_comment("reviewer", {"input_tokens": 5})
|
||||
assert "12-review.md" not in c
|
||||
assert "http" not in c
|
||||
|
||||
|
||||
# --------------------------------------------------------------------------- #
|
||||
# task summary
|
||||
# --------------------------------------------------------------------------- #
|
||||
@@ -174,3 +263,47 @@ def test_task_summary_aggregates_over_agents():
|
||||
assert "$0.15" in comment # total cost
|
||||
assert "Developer" in comment
|
||||
assert "Tester" in comment
|
||||
|
||||
|
||||
def test_task_summary_sums_all_three_input_components():
|
||||
"""FIX 2: total_in = SUM(input + cache_read + cache_creation); total_cached too."""
|
||||
rid = _new_run(agent="developer", task_id=77)
|
||||
U.record_usage(rid, {
|
||||
"input_tokens": 100,
|
||||
"cache_read_tokens": 2000,
|
||||
"cache_creation_tokens": 900,
|
||||
"output_tokens": 50,
|
||||
"cost_usd": 0.10,
|
||||
})
|
||||
rid2 = _new_run(agent="tester", task_id=77)
|
||||
U.record_usage(rid2, {
|
||||
"input_tokens": 10,
|
||||
"cache_read_tokens": 500,
|
||||
"cache_creation_tokens": 0,
|
||||
"output_tokens": 5,
|
||||
"cost_usd": 0.05,
|
||||
})
|
||||
s = U.task_usage_summary(77)
|
||||
# total_in = (100+2000+900) + (10+500+0) = 3510
|
||||
assert s["total_in"] == 3510
|
||||
# total_cached = (2000+900) + (500+0) = 3400
|
||||
assert s["total_cached"] == 3400
|
||||
assert s["total_out"] == 55
|
||||
comment = U.task_summary_comment(77)
|
||||
assert "cached" in comment
|
||||
|
||||
|
||||
def test_task_summary_handles_null_cache_creation():
|
||||
"""Pre-existing rows (NULL cache_creation) must not break aggregation."""
|
||||
rid = _new_run(agent="developer", task_id=88)
|
||||
conn = get_db()
|
||||
conn.execute(
|
||||
"UPDATE agent_runs SET input_tokens=100, cache_read_tokens=200, "
|
||||
"cache_creation_tokens=NULL, output_tokens=10, cost_usd=0.01 WHERE id=?",
|
||||
(rid,),
|
||||
)
|
||||
conn.commit()
|
||||
conn.close()
|
||||
s = U.task_usage_summary(88) # must not raise
|
||||
assert s["total_in"] == 300 # 100 + 200 + (NULL->0)
|
||||
assert s["total_cached"] == 200
|
||||
|
||||
@@ -433,3 +433,67 @@ def test_ci_failure_development_escalates_at_limit(
|
||||
assert "after CI failure" in err_msg
|
||||
# Stage untouched.
|
||||
assert not mock_update_stage.called
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# BUG 8 (second door): a merged-PR webhook must NOT fake-complete a task that is
|
||||
# still in the deploy stage. On `deploy` done is gated by the deployer's verdict
|
||||
# (check_deploy_status via advance_stage), not by the merge event. For every
|
||||
# other stage the merge->done behaviour is preserved. Pure-logic tests: invoke
|
||||
# handle_pr() directly with mocked helpers (no HMAC barrier).
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def _merged_pr_payload(branch="feature/ET-012-x"):
|
||||
return {
|
||||
"action": "closed",
|
||||
"pull_request": {
|
||||
"merged": True,
|
||||
"number": 7,
|
||||
"head": {"ref": branch},
|
||||
},
|
||||
"repository": {"name": "enduro-trails"},
|
||||
}
|
||||
|
||||
|
||||
@patch("src.webhooks.gitea.notify_stage_change")
|
||||
@patch("src.webhooks.gitea.update_task_stage")
|
||||
@patch("src.webhooks.gitea.get_task_by_repo_branch")
|
||||
@patch("src.webhooks.gitea.get_project_by_repo")
|
||||
def test_merge_on_deploy_stage_does_not_set_done(
|
||||
mock_proj, mock_task, mock_update_stage, mock_notify,
|
||||
):
|
||||
"""FIX 1: merge at deploy stage is ignored — done is gated by deployer verdict."""
|
||||
from src.webhooks.gitea import handle_pr
|
||||
|
||||
mock_proj.return_value = {"repo": "enduro-trails"}
|
||||
mock_task.return_value = {
|
||||
"id": 1, "stage": "deploy", "work_item_id": "ET-012",
|
||||
}
|
||||
|
||||
asyncio.run(handle_pr(_merged_pr_payload()))
|
||||
|
||||
# The merge-driven done path must NOT run on deploy.
|
||||
assert not mock_update_stage.called
|
||||
assert not mock_notify.called
|
||||
|
||||
|
||||
@patch("src.webhooks.gitea.notify_stage_change")
|
||||
@patch("src.webhooks.gitea.update_task_stage")
|
||||
@patch("src.webhooks.gitea.get_task_by_repo_branch")
|
||||
@patch("src.webhooks.gitea.get_project_by_repo")
|
||||
def test_merge_on_non_deploy_stage_sets_done(
|
||||
mock_proj, mock_task, mock_update_stage, mock_notify,
|
||||
):
|
||||
"""FIX 1: merge behaviour is preserved for non-deploy stages (e.g. review)."""
|
||||
from src.webhooks.gitea import handle_pr
|
||||
|
||||
mock_proj.return_value = {"repo": "enduro-trails"}
|
||||
mock_task.return_value = {
|
||||
"id": 2, "stage": "review", "work_item_id": "ET-013",
|
||||
}
|
||||
|
||||
asyncio.run(handle_pr(_merged_pr_payload(branch="feature/ET-013-x")))
|
||||
|
||||
# Non-deploy stages still get the merge-driven done.
|
||||
mock_update_stage.assert_called_once_with(2, "done")
|
||||
assert mock_notify.called
|
||||
|
||||
Reference in New Issue
Block a user